1 - Publish and subscribe overview

Overview of the pub/sub API building block

Publish and subscribe (pub/sub) enables microservices to communicate with each other using messages for event-driven architectures.

  • The producer, or publisher, writes messages to an input channel and sends them to a topic, unaware which application will receive them.
  • The consumer, or subscriber, subscribes to the topic and receives messages from an output channel, unaware which service produced these messages.

An intermediary message broker copies each message from a publisher’s input channel to an output channel for all subscribers interested in that message. This pattern is especially useful when you need to decouple microservices from one another.



Pub/sub API

The pub/sub API in Dapr:

  • Provides a platform-agnostic API to send and receive messages.
  • Offers at-least-once message delivery guarantee.
  • Integrates with various message brokers and queuing systems.

The specific message broker used by your service is pluggable and configured as a Dapr pub/sub component at runtime. This removes the dependency from your service and makes your service more portable and flexible to changes.

When using pub/sub in Dapr:

  1. Your service makes a network call to a Dapr pub/sub building block API.
  2. The pub/sub building block makes calls into a Dapr pub/sub component that encapsulates a specific message broker.
  3. To receive messages on a topic, Dapr subscribes to the pub/sub component on behalf of your service with a topic and delivers the messages to an endpoint on your service when they arrive.

The following overview video and demo demonstrates how Dapr pub/sub works.

In the diagram below, a “shipping” service and an “email” service have both subscribed to topics published by a “cart” service. Each service loads pub/sub component configuration files that point to the same pub/sub message broker component; for example: Redis Streams, NATS Streaming, Azure Service Bus, or GCP pub/sub.

In the diagram below, the Dapr API posts an “order” topic from the publishing “cart” service to “order” endpoints on the “shipping” and “email” subscribing services.

View the complete list of pub/sub components that Dapr supports.

Features

The pub/sub API building block brings several features to your application.

Sending messages using Cloud Events

To enable message routing and provide additional context with each message between services, Dapr uses the CloudEvents 1.0 specification as its message format. Any message sent by an application to a topic using Dapr is automatically wrapped in a Cloud Events envelope, using Content-Type header value for datacontenttype attribute.

For more information, read about messaging with CloudEvents, or sending raw messages without CloudEvents.

Communication with applications not using Dapr and CloudEvents

If one of your applications uses Dapr while another doesn’t, you can disable the CloudEvent wrapping for a publisher or subscriber. This allows partial adoption of Dapr pub/sub in applications that cannot adopt Dapr all at once.

For more information, read how to use pub/sub without CloudEvents.

Setting message content types

When publishing a message, it’s important to specify the content type of the data being sent. Unless specified, Dapr will assume text/plain.

  • HTTP client: the content type can be set in a Content-Type header
  • gRPC client and SDK: have a dedicated content type parameter

Message delivery

In principle, Dapr considers a message successfully delivered once the subscriber processes the message and responds with a non-error response. For more granular control, Dapr’s pub/sub API also provides explicit statuses, defined in the response payload, with which the subscriber indicates specific handling instructions to Dapr (for example, RETRY or DROP).

Receiving messages with topic subscriptions

Dapr applications can subscribe to published topics via three subscription types that support the same features: declarative, streaming and programmatic.

Subscription type Description
Declarative The subscription is defined in an external file. The declarative approach removes the Dapr dependency from your code and allows for existing applications to subscribe to topics, without having to change code.
Streaming The subscription is defined in the user code. Streaming subscriptions are dynamic, meaning they allow for adding or removing subscriptions at runtime. They do not require a subscription endpoint in your application (that is required by both programmatic and declarative subscriptions), making them easy to configure in code. Streaming subscriptions also do not require an app to be configured with the sidecar to receive messages. With streaming subscriptions, since messages are sent to a message handler code, there is no concept of routes or bulk subscriptions.
Programmatic Subscription is defined in the user code. The programmatic approach implements the static subscription and requires an endpoint in your code.

For more information, read about the subscriptions in Subscription Types.

Reloading topic subscriptions

To reload topic subscriptions that are defined programmatically or declaratively, the Dapr sidecar needs to be restarted. The Dapr sidecar can be made to dynamically reload changed declarative topic subscriptions without restarting by enabling the HotReload feature gate. Hot reloading of topic subscriptions is currently a preview feature. In-flight messages are unaffected when reloading a subscription.

Message routing

Dapr provides content-based routing pattern. Pub/sub routing is an implementation of this pattern that allows developers to use expressions to route CloudEvents based on their contents to different URIs/paths and event handlers in your application. If no route matches, an optional default route is used. This is useful as your applications expands to support multiple event versions or special cases.

This feature is available to both the declarative and programmatic subscription approaches.

For more information on message routing, read Dapr pub/sub API reference

Handling failed messages with dead letter topics

Sometimes, messages can’t be processed because of a variety of possible issues, such as erroneous conditions within the producer or consumer application or an unexpected state change that causes an issue with your application code. Dapr allows developers to set dead letter topics to deal with messages that cannot be delivered to an application. This feature is available on all pub/sub components and prevents consumer applications from endlessly retrying a failed message. For more information, read about dead letter topics

Enabling the outbox pattern

Dapr enables developers to use the outbox pattern for achieving a single transaction across a transactional state store and any message broker. For more information, read How to enable transactional outbox messaging

Namespace consumer groups

Dapr solves multi-tenancy at-scale with namespaces for consumer groups. Simply include the "{namespace}" value in your component metadata for consumer groups to allow multiple namespaces with applications of the same app-id to publish and subscribe to the same message broker.

At-least-once guarantee

Dapr guarantees at-least-once semantics for message delivery. When an application publishes a message to a topic using the pub/sub API, Dapr ensures the message is delivered at least once to every subscriber.

Even if the message fails to deliver, or your application crashes, Dapr attempts to redeliver the message until successful delivery.

All Dapr pub/sub components support the at-least-once guarantee.

Consumer groups and competing consumers pattern

Dapr handles the burden of dealing with consumer groups and the competing consumers pattern. In the competing consumers pattern, multiple application instances using a single consumer group compete for the message. Dapr enforces the competing consumer pattern when replicas use the same app-id without explicit consumer group overrides.

When multiple instances of the same application (with same app-id) subscribe to a topic, Dapr delivers each message to only one instance of that application. This concept is illustrated in the diagram below.



Similarly, if two different applications (with different app-id) subscribe to the same topic, Dapr delivers each message to only one instance of each application.

Not all Dapr pub/sub components support the competing consumer pattern. Currently, the following (non-exhaustive) pub/sub components support this:

Scoping topics for added security

By default, all topic messages associated with an instance of a pub/sub component are available to every application configured with that component. You can limit which application can publish or subscribe to topics with Dapr topic scoping. For more information, read: pub/sub topic scoping.

Message Time-to-Live (TTL)

Dapr can set a timeout message on a per-message basis, meaning that if the message is not read from the pub/sub component, then the message is discarded. This timeout message prevents a build up of unread messages. If a message has been in the queue longer than the configured TTL, it is marked as dead. For more information, read pub/sub message TTL.

Publish and subscribe to bulk messages

Dapr supports sending and receiving multiple messages in a single request. When writing applications that need to send or receive a large number of messages, using bulk operations allows achieving high throughput by reducing the overall number of requests. For more information, read pub/sub bulk messages.

Scaling subscribers with StatefulSets

When running on Kubernetes, subscribers can have a sticky consumerID per instance when using StatefulSets in combination with the {podName} marker. See how to horizontally scale subscribers with StatefulSets.

Try out pub/sub

Quickstarts and tutorials

Want to put the Dapr pub/sub API to the test? Walk through the following quickstart and tutorials to see pub/sub in action:

Quickstart/tutorial Description
Pub/sub quickstart Send and receive messages using the publish and subscribe API.
Pub/sub tutorial Demonstrates how to use Dapr to enable pub-sub applications. Uses Redis as a pub-sub component.

Start using pub/sub directly in your app

Want to skip the quickstarts? Not a problem. You can try out the pub/sub building block directly in your application to publish messages and subscribe to a topic. After Dapr is installed, you can begin using the pub/sub API starting with the pub/sub how-to guide.

Next steps

2 - How to: Publish a message and subscribe to a topic

Learn how to send messages to a topic with one service and subscribe to that topic in another service

Now that you’ve learned what the Dapr pub/sub building block provides, learn how it can work in your service. The below code example loosely describes an application that processes orders with two services, each with Dapr sidecars:

  • A checkout service using Dapr to subscribe to the topic in the message queue.
  • An order processing service using Dapr to publish a message to RabbitMQ.
Diagram showing state management of example service

Dapr automatically wraps the user payload in a CloudEvents v1.0 compliant envelope, using Content-Type header value for datacontenttype attribute. Learn more about messages with CloudEvents.

The following example demonstrates how your applications publish and subscribe to a topic called orders.

Set up the Pub/Sub component

The first step is to set up the pub/sub component:

When you run dapr init, Dapr creates a default Redis pubsub.yaml and runs a Redis container on your local machine, located:

  • On Windows, under %UserProfile%\.dapr\components\pubsub.yaml
  • On Linux/MacOS, under ~/.dapr/components/pubsub.yaml

With the pubsub.yaml component, you can easily swap out underlying components without application code changes. In this example, RabbitMQ is used.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: order-pub-sub
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: host
    value: "amqp://localhost:5672"
  - name: durable
    value: "false"
  - name: deletedWhenUnused
    value: "false"
  - name: autoAck
    value: "false"
  - name: reconnectWait
    value: "0"
  - name: concurrency
    value: parallel
scopes:
  - orderprocessing
  - checkout

You can override this file with another pubsub component by creating a components directory (in this example, myComponents) containing the file and using the flag --resources-path with the dapr run CLI command.

To deploy this into a Kubernetes cluster, fill in the metadata connection details of the pub/sub component in the YAML below, save as pubsub.yaml, and run kubectl apply -f pubsub.yaml.

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: order-pub-sub
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: connectionString
    value: "amqp://localhost:5672"
  - name: protocol
    value: amqp  
  - name: hostname
    value: localhost 
  - name: username
    value: username
  - name: password
    value: password 
  - name: durable
    value: "false"
  - name: deletedWhenUnused
    value: "false"
  - name: autoAck
    value: "false"
  - name: reconnectWait
    value: "0"
  - name: concurrency
    value: parallel
scopes:
  - orderprocessing
  - checkout
dapr run --app-id myapp --resources-path ./myComponents -- dotnet run
dapr run --app-id myapp --resources-path ./myComponents -- mvn spring-boot:run
dapr run --app-id myapp --resources-path ./myComponents -- python3 app.py
dapr run --app-id myapp --resources-path ./myComponents -- go run app.go
dapr run --app-id myapp --resources-path ./myComponents -- npm start

Subscribe to topics

Dapr provides three methods by which you can subscribe to topics:

  • Declaratively, where subscriptions are defined in an external file.
  • Streaming, where subscriptions are defined in user code.
  • Programmatically, where subscriptions are defined in user code.

Learn more in the declarative, streaming, and programmatic subscriptions doc. This example demonstrates a declarative subscription.

Create a file named subscription.yaml and paste the following:

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: order-pub-sub
spec:
  topic: orders
  routes: 
    default: /checkout
  pubsubname: order-pub-sub
scopes:
- orderprocessing
- checkout

The example above shows an event subscription to topic orders, for the pubsub component order-pub-sub.

  • The route field tells Dapr to send all topic messages to the /checkout endpoint in the app.
  • The scopes field enables this subscription for apps with IDs orderprocessing and checkout.

Place subscription.yaml in the same directory as your pubsub.yaml component. When Dapr starts up, it loads subscriptions along with the components.

Below are code examples that leverage Dapr SDKs to subscribe to the topic you defined in subscription.yaml.

using System.Collections.Generic;
using System.Threading.Tasks;
using System;
using Microsoft.AspNetCore.Mvc;
using Dapr;
using Dapr.Client;

namespace CheckoutService.Controllers;

[ApiController]
public sealed class CheckoutServiceController : ControllerBase
{
    //Subscribe to a topic called "orders" from the "order-pub-sub" compoennt 
    [Topic("order-pub-sub", "orders")]
    [HttpPost("checkout")]
    public void GetCheckout([FromBody] int orderId)
    {
        Console.WriteLine("Subscriber received : " + orderId);
    }
}

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:

dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 --app-protocol https dotnet run
//dependencies
import io.dapr.Topic;
import io.dapr.client.domain.CloudEvent;
import org.springframework.web.bind.annotation.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

//code
@RestController
public class CheckoutServiceController {

    private static final Logger log = LoggerFactory.getLogger(CheckoutServiceController.class);
     //Subscribe to a topic
    @Topic(name = "orders", pubsubName = "order-pub-sub")
    @PostMapping(path = "/checkout")
    public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
        return Mono.fromRunnable(() -> {
            try {
                log.info("Subscriber received: " + cloudEvent.getData());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }
}

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:

dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 mvn spring-boot:run
#dependencies
from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
import logging
import json

#code
app = App()
logging.basicConfig(level = logging.INFO)
#Subscribe to a topic 
@app.subscribe(pubsub_name='order-pub-sub', topic='orders')
def mytopic(event: v1.Event) -> None:
    data = json.loads(event.Data())
    logging.info('Subscriber received: ' + str(data))

app.run(6002)

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:

dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --app-protocol grpc -- python3 CheckoutService.py
//dependencies
import (
	"log"
	"net/http"
	"context"

	"github.com/dapr/go-sdk/service/common"
	daprd "github.com/dapr/go-sdk/service/http"
)

//code
var sub = &common.Subscription{
	PubsubName: "order-pub-sub",
	Topic:      "orders",
	Route:      "/checkout",
}

func main() {
	s := daprd.NewService(":6002")
   //Subscribe to a topic
	if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
		log.Fatalf("error adding topic subscription: %v", err)
	}
	if err := s.Start(); err != nil && err != http.ErrServerClosed {
		log.Fatalf("error listenning: %v", err)
	}
}

func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
	log.Printf("Subscriber received: %s", e.Data)
	return false, nil
}

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:

dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 go run CheckoutService.go
//dependencies
import { DaprServer, CommunicationProtocolEnum } from '@dapr/dapr'; 

//code
const daprHost = "127.0.0.1"; 
const serverHost = "127.0.0.1";
const serverPort = "6002"; 

start().catch((e) => {
    console.error(e);
    process.exit(1);
});

async function start(orderId) {
    const server = new DaprServer({
        serverHost,
        serverPort,
        communicationProtocol: CommunicationProtocolEnum.HTTP,
        clientOptions: {
          daprHost,
          daprPort: process.env.DAPR_HTTP_PORT,
        },
    });
    //Subscribe to a topic
    await server.pubsub.subscribe("order-pub-sub", "orders", async (orderId) => {
        console.log(`Subscriber received: ${JSON.stringify(orderId)}`)
    });
    await server.start();
}

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:

dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 npm start

Publish a message

Start an instance of Dapr with an app-id called orderprocessing:

dapr run --app-id orderprocessing --dapr-http-port 3601

Then publish a message to the orders topic:

dapr publish --publish-app-id orderprocessing --pubsub order-pub-sub --topic orders --data '{"orderId": "100"}'
curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders -H "Content-Type: application/json" -d '{"orderId": "100"}'
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"orderId": "100"}' -Uri 'http://localhost:3601/v1.0/publish/order-pub-sub/orders'

Below are code examples that leverage Dapr SDKs to publish a topic.

using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Dapr.Client;
using System.Threading;

const string PUBSUB_NAME = "order-pub-sub";
const string TOPIC_NAME = "orders";

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDaprClient();

var app = builder.Build();
var random = new Random();

var client = app.Services.GetRequiredService<DaprClient>();

while(true) {
    await Task.Delay(TimeSpan.FromSeconds(5));
    var orderId = random.Next(1,1000);
    var source = new CancellationTokenSource();
    var cancellationToken = source.Token;
    
    //Using Dapr SDK to publish a topic
    await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, orderId, cancellationToken);
    Console.WriteLine("Published data: " + orderId);
}

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 --app-protocol https dotnet run
//dependencies
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.Metadata;
import static java.util.Collections.singletonMap;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import java.util.concurrent.TimeUnit;

//code
@SpringBootApplication
public class OrderProcessingServiceApplication {

	private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);

	public static void main(String[] args) throws InterruptedException{
		String MESSAGE_TTL_IN_SECONDS = "1000";
		String TOPIC_NAME = "orders";
		String PUBSUB_NAME = "order-pub-sub";

		while(true) {
			TimeUnit.MILLISECONDS.sleep(5000);
			Random random = new Random();
			int orderId = random.nextInt(1000-1) + 1;
			DaprClient client = new DaprClientBuilder().build();
      //Using Dapr SDK to publish a topic
			client.publishEvent(
					PUBSUB_NAME,
					TOPIC_NAME,
					orderId,
					singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
			log.info("Published data:" + orderId);
		}
	}
}

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
#dependencies  
import random
from time import sleep    
import requests
import logging
import json
from dapr.clients import DaprClient

#code
logging.basicConfig(level = logging.INFO)
while True:
    sleep(random.randrange(50, 5000) / 1000)
    orderId = random.randint(1, 1000)
    PUBSUB_NAME = 'order-pub-sub'
    TOPIC_NAME = 'orders'
    with DaprClient() as client:
        #Using Dapr SDK to publish a topic
        result = client.publish_event(
            pubsub_name=PUBSUB_NAME,
            topic_name=TOPIC_NAME,
            data=json.dumps(orderId),
            data_content_type='application/json',
        )
    logging.info('Published data: ' + str(orderId))

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --app-protocol grpc python3 OrderProcessingService.py
//dependencies
import (
	"context"
	"log"
	"math/rand"
	"time"
	"strconv"
	dapr "github.com/dapr/go-sdk/client"
)

//code
var (
	PUBSUB_NAME = "order-pub-sub"
	TOPIC_NAME  = "orders"
)

func main() {
	for i := 0; i < 10; i++ {
		time.Sleep(5000)
		orderId := rand.Intn(1000-1) + 1
		client, err := dapr.NewClient()
		if err != nil {
			panic(err)
		}
		defer client.Close()
		ctx := context.Background()
    //Using Dapr SDK to publish a topic
		if err := client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId))); 
		err != nil {
			panic(err)
		}

		log.Println("Published data: " + strconv.Itoa(orderId))
	}
}

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
//dependencies
import { DaprServer, DaprClient, CommunicationProtocolEnum } from '@dapr/dapr'; 

const daprHost = "127.0.0.1"; 

var main = function() {
    for(var i=0;i<10;i++) {
        sleep(5000);
        var orderId = Math.floor(Math.random() * (1000 - 1) + 1);
        start(orderId).catch((e) => {
            console.error(e);
            process.exit(1);
        });
    }
}

async function start(orderId) {
    const PUBSUB_NAME = "order-pub-sub"
    const TOPIC_NAME  = "orders"
    const client = new DaprClient({
        daprHost,
        daprPort: process.env.DAPR_HTTP_PORT, 
        communicationProtocol: CommunicationProtocolEnum.HTTP
    });
    console.log("Published data:" + orderId)
    //Using Dapr SDK to publish a topic
    await client.pubsub.publish(PUBSUB_NAME, TOPIC_NAME, orderId);
}

function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

main();

Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:

dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start

Message acknowledgement and retries

In order to tell Dapr that a message was processed successfully, return a 200 OK response. If Dapr receives any other return status code than 200, or if your app crashes, Dapr will attempt to redeliver the message following at-least-once semantics.

Demo video

Watch this demo video to learn more about pub/sub messaging with Dapr.

Next steps

3 - Publishing & subscribing messages with Cloudevents

Learn why Dapr uses CloudEvents, how they work in Dapr pub/sub, and how to create CloudEvents.

To enable message routing and provide additional context with each message, Dapr uses the CloudEvents 1.0 specification as its message format. Any message sent by an application to a topic using Dapr is automatically wrapped in a CloudEvents envelope, using the Content-Type header value for datacontenttype attribute.

Dapr uses CloudEvents to provide additional context to the event payload, enabling features like:

  • Tracing
  • Content-type for proper deserialization of event data
  • Verification of sender application

You can choose any of three methods for publish a CloudEvent via pub/sub:

  1. Send a pub/sub event, which is then wrapped by Dapr in a CloudEvent envelope.
  2. Replace specific CloudEvents attributes provided by Dapr by overriding the standard CloudEvent properties.
  3. Write your own CloudEvent envelope as part of the pub/sub event.

Dapr-generated CloudEvents example

Sending a publish operation to Dapr automatically wraps it in a CloudEvent envelope containing the following fields:

  • id
  • source
  • specversion
  • type
  • traceparent
  • traceid
  • tracestate
  • topic
  • pubsubname
  • time
  • datacontenttype (optional)

The following example demonstrates a CloudEvent generated by Dapr for a publish operation to the orders topic that includes:

  • A W3C traceid unique to the message
  • The data and the fields for the CloudEvent where the data content is serialized as JSON
{
  "topic": "orders",
  "pubsubname": "order_pub_sub",
  "traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
  "tracestate": "",
  "data": {
    "orderId": 1
  },
  "id": "5929aaac-a5e2-4ca1-859c-edfe73f11565",
  "specversion": "1.0",
  "datacontenttype": "application/json; charset=utf-8",
  "source": "checkout",
  "type": "com.dapr.event.sent",
  "time": "2020-09-23T06:23:21Z",
  "traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
}

As another example of a v1.0 CloudEvent, the following shows data as XML content in a CloudEvent message serialized as JSON:

{
  "topic": "orders",
  "pubsubname": "order_pub_sub",
  "traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
  "tracestate": "",
  "data" : "<note><to></to><from>user2</from><message>Order</message></note>",
  "id" : "id-1234-5678-9101",
  "specversion" : "1.0",
  "datacontenttype" : "text/xml",
  "subject" : "Test XML Message",
  "source" : "https://example.com/message",
  "type" : "xml.message",
   "time" : "2020-09-23T06:23:21Z"
}

Replace Dapr generated CloudEvents values

Dapr automatically generates several CloudEvent properties. You can replace these generated CloudEvent properties by providing the following optional metadata key/value:

  • cloudevent.id: overrides id
  • cloudevent.source: overrides source
  • cloudevent.type: overrides type
  • cloudevent.traceid: overrides traceid
  • cloudevent.tracestate: overrides tracestate
  • cloudevent.traceparent: overrides traceparent

The ability to replace CloudEvents properties using these metadata properties applies to all pub/sub components.

Example

For example, to replace the source and id values from the CloudEvent example above in code:

with DaprClient() as client:
    order = {'orderId': i}
    # Publish an event/message using Dapr PubSub
    result = client.publish_event(
        pubsub_name='order_pub_sub',
        topic_name='orders',
        publish_metadata={'cloudevent.id': 'd99b228f-6c73-4e78-8c4d-3f80a043d317', 'cloudevent.source': 'payment'}
    )

    # or

    cloud_event = {
        'specversion': '1.0',
        'type': 'com.example.event',
        'source': 'payment',
        'id': 'd99b228f-6c73-4e78-8c4d-3f80a043d317',
        'data': {'orderId': i},
        'datacontenttype': 'application/json',
        ...
    }

    # Set the data content type to 'application/cloudevents+json'
    result = client.publish_event(
        pubsub_name='order_pub_sub',
        topic_name='orders',
        data=json.dumps(cloud_event),
        data_content_type='application/cloudevents+json',
    )
var order = new Order(i);
using var client = new DaprClientBuilder().Build();

// Override cloudevent metadata
var metadata = new Dictionary<string,string>() {
    { "cloudevent.source", "payment" },
    { "cloudevent.id", "d99b228f-6c73-4e78-8c4d-3f80a043d317" }
}

// Publish an event/message using Dapr PubSub
await client.PublishEventAsync("order_pub_sub", "orders", order, metadata);
Console.WriteLine("Published data: " + order);

await Task.Delay(TimeSpan.FromSeconds(1));

The JSON payload then reflects the new source and id values:

{
  "topic": "orders",
  "pubsubname": "order_pub_sub",
  "traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
  "tracestate": "",
  "data": {
    "orderId": 1
  },
  "id": "d99b228f-6c73-4e78-8c4d-3f80a043d317",
  "specversion": "1.0",
  "datacontenttype": "application/json; charset=utf-8",
  "source": "payment",
  "type": "com.dapr.event.sent",
  "time": "2020-09-23T06:23:21Z",
  "traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
}

Publish your own CloudEvent

If you want to use your own CloudEvent, make sure to specify the datacontenttype as application/cloudevents+json.

If the CloudEvent that was authored by the app does not contain the minimum required fields in the CloudEvent specification, the message is rejected. Dapr adds the following fields to the CloudEvent if they are missing:

  • time
  • traceid
  • traceparent
  • tracestate
  • topic
  • pubsubname
  • source
  • type
  • specversion

You can add additional fields to a custom CloudEvent that are not part of the official CloudEvent specification. Dapr will pass these fields as-is.

Example

Publish a CloudEvent to the orders topic:

dapr publish --publish-app-id orderprocessing --pubsub order-pub-sub --topic orders --data '{\"orderId\": \"100\"}'

Publish a CloudEvent to the orders topic:

curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders -H "Content-Type: application/cloudevents+json" -d '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}'

Publish a CloudEvent to the orders topic:

Invoke-RestMethod -Method Post -ContentType 'application/cloudevents+json' -Body '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' -Uri 'http://localhost:3601/v1.0/publish/order-pub-sub/orders'

Event deduplication

When using cloud events created by Dapr, the envelope contains an id field which can be used by the app to perform message deduplication. Dapr does not handle deduplication automatically. Dapr supports using message brokers that natively enable message deduplication.

Next steps

4 - Publishing & subscribing messages without CloudEvents

Learn when you might not use CloudEvents and how to disable them.

When adding Dapr to your application, some services may still need to communicate via pub/sub messages not encapsulated in CloudEvents, due to either compatibility reasons or some apps not using Dapr. These are referred to as “raw” pub/sub messages. Dapr enables apps to publish and subscribe to raw events not wrapped in a CloudEvent for compatibility and to send data that is not JSON serializable.

Publishing raw messages

Dapr apps are able to publish raw events to pub/sub topics without CloudEvent encapsulation, for compatibility with non-Dapr apps.

Diagram showing how to publish with Dapr when subscriber does not use Dapr or CloudEvent

To disable CloudEvent wrapping, set the rawPayload metadata to true as part of the publishing request. This allows subscribers to receive these messages without having to parse the CloudEvent schema.

curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/TOPIC_A?metadata.rawPayload=true -H "Content-Type: application/json" -d '{"order-number": "345"}'
using Dapr.Client;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers().AddDapr();

var app = builder.Build();

app.MapPost("/publish", async (DaprClient daprClient) =>
{
    var message = new Message(
        Guid.NewGuid().ToString(),
        $"Hello at {DateTime.UtcNow}",
        DateTime.UtcNow
    );

    await daprClient.PublishEventAsync(
        "pubsub",           // pubsub name
        "messages",         // topic name
        message,           // message data
        new Dictionary<string, string> 
        { 
            { "rawPayload", "true" },
            { "content-type", "application/json" }
        }
    );
    
    return Results.Ok(message);
});

app.Run();
from dapr.clients import DaprClient

with DaprClient() as d:
    req_data = {
        'order-number': '345'
    }
    # Create a typed message with content type and body
    resp = d.publish_event(
        pubsub_name='pubsub',
        topic_name='TOPIC_A',
        data=json.dumps(req_data),
        publish_metadata={'rawPayload': 'true'}
    )
    # Print the request
    print(req_data, flush=True)
<?php

require_once __DIR__.'/vendor/autoload.php';

$app = \Dapr\App::create();
$app->run(function(\DI\FactoryInterface $factory) {
    $publisher = $factory->make(\Dapr\PubSub\Publish::class, ['pubsub' => 'pubsub']);
    $publisher->topic('TOPIC_A')->publish('data', ['rawPayload' => 'true']);
});

Subscribing to raw messages

Dapr apps can subscribe to raw messages from pub/sub topics, even if they weren’t published as CloudEvents. However, the subscribing Dapr process still wraps these raw messages in a CloudEvent before delivering them to the subscribing application.

Diagram showing how to subscribe with Dapr when publisher does not use Dapr or CloudEvent

Programmatically subscribe to raw events

When subscribing programmatically, add the additional metadata entry for rawPayload to allow the subscriber to receive a message that is not wrapped by a CloudEvent. For .NET, this metadata entry is called isRawPayload.

When using raw payloads the message is always base64 encoded with content type application/octet-stream.

using System.Text.Json;
using System.Text.Json.Serialization;

var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();

app.MapGet("/dapr/subscribe", () =>
{
    var subscriptions = new[]
    {
        new
        {
            pubsubname = "pubsub",
            topic = "messages",
            route = "/messages",
            metadata = new Dictionary<string, string>
            {
                { "isRawPayload", "true" },
                { "content-type", "application/json" }
            }
        }
    };
    return Results.Ok(subscriptions);
});

app.MapPost("/messages", async (HttpContext context) =>
{
    using var reader = new StreamReader(context.Request.Body);
    var json = await reader.ReadToEndAsync();

    Console.WriteLine($"Raw message received: {json}");

    return Results.Ok();
});

app.Run();
import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys

app = flask.Flask(__name__)
CORS(app)

@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
    subscriptions = [{'pubsubname': 'pubsub',
                      'topic': 'deathStarStatus',
                      'route': 'dsstatus',
                      'metadata': {
                          'rawPayload': 'true',
                      } }]
    return jsonify(subscriptions)

@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
    print(request.json, flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}

app.run()
<?php

require_once __DIR__.'/vendor/autoload.php';

$app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
    new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'deathStarStatus', route: '/dsstatus', metadata: [ 'rawPayload' => 'true'] ),
]]));

$app->post('/dsstatus', function(
    #[\Dapr\Attributes\FromBody]
    \Dapr\PubSub\CloudEvent $cloudEvent,
    \Psr\Log\LoggerInterface $logger
    ) {
        $logger->alert('Received event: {event}', ['event' => $cloudEvent]);
        return ['status' => 'SUCCESS'];
    }
);

$app->start();

Declaratively subscribe to raw events

Similarly, you can subscribe to raw events declaratively by adding the rawPayload metadata entry to your subscription specification.

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: myevent-subscription
spec:
  topic: deathStarStatus
  routes: 
    default: /dsstatus
  pubsubname: pubsub
  metadata:
    isRawPayload: "true"
scopes:
- app1
- app2

Next steps

5 - How-To: Route messages to different event handlers

Learn how to route messages from a topic to different event handlers based on CloudEvent fields

Pub/sub routing is an implementation of content-based routing, a messaging pattern that utilizes a DSL instead of imperative application code. With pub/sub routing, you use expressions to route CloudEvents (based on their contents) to different URIs/paths and event handlers in your application. If no route matches, then an optional default route is used. This proves useful as your applications expand to support multiple event versions or special cases.

While routing can be implemented with code, keeping routing rules external from the application can improve portability.

This feature is available to both the declarative and programmatic subscription approaches, however does not apply to streaming subscriptions.

Declarative subscription

For declarative subscriptions, use dapr.io/v2alpha1 as the apiVersion. Here is an example of subscriptions.yaml using routing:

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: myevent-subscription
spec:
  pubsubname: pubsub
  topic: inventory
  routes:
    rules:
      - match: event.type == "widget"
        path: /widgets
      - match: event.type == "gadget"
        path: /gadgets
    default: /products
scopes:
  - app1
  - app2

Programmatic subscription

In the programmatic approach, the routes structure is returned instead of route. The JSON structure matches the declarative YAML:

import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys

app = flask.Flask(__name__)
CORS(app)

@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
    subscriptions = [
      {
        'pubsubname': 'pubsub',
        'topic': 'inventory',
        'routes': {
          'rules': [
            {
              'match': 'event.type == "widget"',
              'path': '/widgets'
            },
            {
              'match': 'event.type == "gadget"',
              'path': '/gadgets'
            },
          ],
          'default': '/products'
        }
      }]
    return jsonify(subscriptions)

@app.route('/products', methods=['POST'])
def ds_subscriber():
    print(request.json, flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));

const port = 3000

app.get('/dapr/subscribe', (req, res) => {
  res.json([
    {
      pubsubname: "pubsub",
      topic: "inventory",
      routes: {
        rules: [
          {
            match: 'event.type == "widget"',
            path: '/widgets'
          },
          {
            match: 'event.type == "gadget"',
            path: '/gadgets'
          },
        ],
        default: '/products'
      }
    }
  ]);
})

app.post('/products', (req, res) => {
  console.log(req.body);
  res.sendStatus(200);
});

app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
        [Topic("pubsub", "inventory", "event.type ==\"widget\"", 1)]
        [HttpPost("widgets")]
        public async Task<ActionResult<Stock>> HandleWidget(Widget widget, [FromServices] DaprClient daprClient)
        {
            // Logic
            return stock;
        }

        [Topic("pubsub", "inventory", "event.type ==\"gadget\"", 2)]
        [HttpPost("gadgets")]
        public async Task<ActionResult<Stock>> HandleGadget(Gadget gadget, [FromServices] DaprClient daprClient)
        {
            // Logic
            return stock;
        }

        [Topic("pubsub", "inventory")]
        [HttpPost("products")]
        public async Task<ActionResult<Stock>> HandleProduct(Product product, [FromServices] DaprClient daprClient)
        {
            // Logic
            return stock;
        }
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"

	"github.com/gorilla/mux"
)

const appPort = 3000

type subscription struct {
	PubsubName string            `json:"pubsubname"`
	Topic      string            `json:"topic"`
	Metadata   map[string]string `json:"metadata,omitempty"`
	Routes     routes            `json:"routes"`
}

type routes struct {
	Rules   []rule `json:"rules,omitempty"`
	Default string `json:"default,omitempty"`
}

type rule struct {
	Match string `json:"match"`
	Path  string `json:"path"`
}

// This handles /dapr/subscribe
func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
	t := []subscription{
		{
			PubsubName: "pubsub",
			Topic:      "inventory",
			Routes: routes{
				Rules: []rule{
					{
						Match: `event.type == "widget"`,
						Path:  "/widgets",
					},
					{
						Match: `event.type == "gadget"`,
						Path:  "/gadgets",
					},
				},
				Default: "/products",
			},
		},
	}

	w.WriteHeader(http.StatusOK)
	json.NewEncoder(w).Encode(t)
}

func main() {
	router := mux.NewRouter().StrictSlash(true)
	router.HandleFunc("/dapr/subscribe", configureSubscribeHandler).Methods("GET")
	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", appPort), router))
}
<?php

require_once __DIR__.'/vendor/autoload.php';

$app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
    new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'inventory', routes: (
      rules: => [
        ('match': 'event.type == "widget"', path: '/widgets'),
        ('match': 'event.type == "gadget"', path: '/gadgets'),
      ]
      default: '/products')),
]]));
$app->post('/products', function(
    #[\Dapr\Attributes\FromBody]
    \Dapr\PubSub\CloudEvent $cloudEvent,
    \Psr\Log\LoggerInterface $logger
    ) {
        $logger->alert('Received event: {event}', ['event' => $cloudEvent]);
        return ['status' => 'SUCCESS'];
    }
);
$app->start();

Common Expression Language (CEL)

In these examples, depending on the event.type, the application will be called on:

  • /widgets
  • /gadgets
  • /products

The expressions are written as Common Expression Language (CEL) where event represents the cloud event. Any of the attributes from the CloudEvents core specification can be referenced in the expression.

Example expressions

Match “important” messages:

has(event.data.important) && event.data.important == true

Match deposits greater than $10,000:

event.type == "deposit" && int(event.data.amount) > 10000

Match multiple versions of a message:

event.type == "mymessage.v1"
event.type == "mymessage.v2"

CloudEvent attributes

For reference, the following attributes are from the CloudEvents specification.

Event Data

data

As defined by the term data, CloudEvents may include domain-specific information about the occurrence. When present, this information will be encapsulated within data.

  • Description: The event payload. This specification places no restriction on the information type. It is encoded into a media format, specified by the datacontenttype attribute (e.g. application/json), and adheres to the dataschema format when those respective attributes are present.
  • Constraints:
    • OPTIONAL

REQUIRED Attributes

The following attributes are required in all CloudEvents:

id

  • Type: String
  • Description: Identifies the event. Producers must ensure that source + id are unique for each distinct event. If a duplicate event is re-sent (e.g. due to a network error), it may have the same id. Consumers may assume that events with identical source and id are duplicates.
  • Constraints:
    • REQUIRED
    • Must be a non-empty string
    • Must be unique within the scope of the producer
  • Examples:
    • An event counter maintained by the producer
    • A UUID

source

  • Type: URI-reference

  • Description: Identifies the context in which an event happened. Often this includes information such as:

    • The type of the event source
    • The organization publishing the event
    • The process that produced the event

    The exact syntax and semantics behind the data encoded in the URI is defined by the event producer.

    Producers must ensure that source + id are unique for each distinct event.

    An application may:

    • Assign a unique source to each distinct producer, making it easier to produce unique IDs and preventing other producers from having the same source.
    • Use UUIDs, URNs, DNS authorities, or an application-specific scheme to create unique source identifiers.

    A source may include more than one producer. In this case, the producers must collaborate to ensure that source + id are unique for each distinct event.

  • Constraints:

    • REQUIRED
    • Must be a non-empty URI-reference
    • An absolute URI is RECOMMENDED
  • Examples:

    • Internet-wide unique URI with a DNS authority:
    • Universally-unique URN with a UUID:
      • urn:uuid:6e8bc430-9c3a-11d9-9669-0800200c9a66
    • Application-specific identifiers:
      • /cloudevents/spec/pull/123
      • /sensors/tn-1234567/alerts
      • 1-555-123-4567

specversion

  • Type: String

  • Description: The version of the CloudEvents specification used by the event. This enables the interpretation of the context. Compliant event producers must use a value of 1.0 when referring to this version of the specification.

    Currently, this attribute only includes the ‘major’ and ‘minor’ version numbers. This allows patch changes to the specification to be made without changing this property’s value in the serialization.

    Note: for ‘release candidate’ releases, a suffix might be used for testing purposes.

  • Constraints:

    • REQUIRED
    • Must be a non-empty string

type

  • Type: String
  • Description: Contains a value describing the event type related to the originating occurrence. Often, this attribute is used for routing, observability, policy enforcement, etc. The format is producer-defined and might include information like the version of the type. See Versioning of CloudEvents in the Primer for more information.
  • Constraints:
    • REQUIRED
    • Must be a non-empty string
    • Should be prefixed with a reverse-DNS name. The prefixed domain dictates the organization, which defines the semantics of this event type.
  • Examples:
    • com.github.pull_request.opened
    • com.example.object.deleted.v2

OPTIONAL Attributes

The following attributes are optional to appear in CloudEvents. See the Notational Conventions section for more information on the definition of OPTIONAL.

datacontenttype

  • Type: String per RFC 2046

  • Description: Content type of data value. This attribute enables data to carry any type of content, whereby format and encoding might differ from that of the chosen event format.

    For example, an event rendered using the JSON envelope format might carry an XML payload in data. The consumer is informed by this attribute being set to "application/xml".

    The rules for how data content is rendered for different datacontenttype values are defined in the event format specifications. For example, the JSON event format defines the relationship in section 3.1.

    For some binary mode protocol bindings, this field is directly mapped to the respective protocol’s content-type metadata property. You can find normative rules for the binary mode and the content-type metadata mapping in the respective protocol.

    In some event formats, you may omit the datacontenttype attribute. For example, if a JSON format event has no datacontenttype attribute, it’s implied that the data is a JSON value conforming to the "application/json" media type. In other words: a JSON-format event with no datacontenttype is exactly equivalent to one with datacontenttype="application/json".

    When translating an event message with no datacontenttype attribute to a different format or protocol binding, the target datacontenttype should be set explicitly to the implied datacontenttype of the source.

  • Constraints:

    • OPTIONAL
    • If present, must adhere to the format specified in RFC 2046
  • For Media Type examples, see IANA Media Types

dataschema

  • Type: URI
  • Description: Identifies the schema that data adheres to. Incompatible changes to the schema should be reflected by a different URI. See Versioning of CloudEvents in the Primer for more information.
  • Constraints:
    • OPTIONAL
    • If present, must be a non-empty URI

subject

  • Type: String

  • Description: This describes the event subject in the context of the event producer (identified by source). In publish-subscribe scenarios, a subscriber will typically subscribe to events emitted by a source. The source identifier alone might not be sufficient as a qualifier for any specific event if the source context has internal sub-structure.

    Identifying the subject of the event in context metadata (opposed to only in the data payload) is helpful in generic subscription filtering scenarios, where middleware is unable to interpret the data content. In the above example, the subscriber might only be interested in blobs with names ending with ‘.jpg’ or ‘.jpeg’. With the subject attribute, you can construct a simple and efficient string-suffix filter for that subset of events.

  • Constraints:

    • OPTIONAL
    • If present, must be a non-empty string
  • Example:
    A subscriber might register interest for when new blobs are created inside a blob-storage container. In this case:

    • The event source identifies the subscription scope (storage container)
    • The event type identifies the “blob created” event
    • The event id uniquely identifies the event instance to distinguish separately created occurrences of a same-named blob.

    The name of the newly created blob is carried in subject:

time

  • Type: Timestamp
  • Description: Timestamp of when the occurrence happened. If the time of the occurrence cannot be determined, then this attribute may be set to some other time (such as the current time) by the CloudEvents producer. However, all producers for the same source must be consistent in this respect. In other words, either they all use the actual time of the occurrence or they all use the same algorithm to determine the value used.
  • Constraints:
    • OPTIONAL
    • If present, must adhere to the format specified in RFC 3339

Community call demo

Watch this video on how to use message routing with pub/sub:

Next steps

6 - Declarative, streaming, and programmatic subscription types

Learn more about the subscription types that allow you to subscribe to message topics.

Pub/sub API subscription types

Dapr applications can subscribe to published topics via three subscription types that support the same features: declarative, streaming and programmatic.

Subscription type Description
Declarative Subscription is defined in an external file. The declarative approach removes the Dapr dependency from your code and allows for existing applications to subscribe to topics, without having to change code.
Streaming Subscription is defined in the application code. Streaming subscriptions are dynamic, meaning they allow for adding or removing subscriptions at runtime. They do not require a subscription endpoint in your application (that is required by both programmatic and declarative subscriptions), making them easy to configure in code. Streaming subscriptions also do not require an app to be configured with the sidecar to receive messages.
Programmatic Subscription is defined in the application code. The programmatic approach implements the static subscription and requires an endpoint in your code.

The examples below demonstrate pub/sub messaging between a checkout app and an orderprocessing app via the orders topic. The examples demonstrate the same Dapr pub/sub component used first declaratively, then programmatically.

Declarative subscriptions

You can subscribe declaratively to a topic using an external component file. This example uses a YAML component file named subscription.yaml:

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: order
spec:
  topic: orders
  routes:
    default: /orders
  pubsubname: pubsub
scopes:
- orderprocessing

Here the subscription called order:

  • Uses the pub/sub component called pubsub to subscribes to the topic called orders.
  • Sets the route field to send all topic messages to the /orders endpoint in the app.
  • Sets scopes field to scope this subscription for access only by apps with ID orderprocessing.

When running Dapr, set the YAML component file path to point Dapr to the component.

dapr run --app-id myapp --resources-path ./myComponents -- dotnet run
dapr run --app-id myapp --resources-path ./myComponents -- mvn spring-boot:run
dapr run --app-id myapp --resources-path ./myComponents -- python3 app.py
dapr run --app-id myapp --resources-path ./myComponents -- npm start
dapr run --app-id myapp --resources-path ./myComponents -- go run app.go

In Kubernetes, apply the component to the cluster:

kubectl apply -f subscription.yaml

In your application code, subscribe to the topic specified in the Dapr pub/sub component.

 //Subscribe to a topic 
[HttpPost("orders")]
public void getCheckout([FromBody] int orderId)
{
    Console.WriteLine("Subscriber received : " + orderId);
}
import io.dapr.client.domain.CloudEvent;

 //Subscribe to a topic
@PostMapping(path = "/orders")
public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
    return Mono.fromRunnable(() -> {
        try {
            log.info("Subscriber received: " + cloudEvent.getData());
        } 
    });
}
from cloudevents.sdk.event import v1

#Subscribe to a topic 
@app.route('/orders', methods=['POST'])
def checkout(event: v1.Event) -> None:
    data = json.loads(event.Data())
    logging.info('Subscriber received: ' + str(data))
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));

// listen to the declarative route
app.post('/orders', (req, res) => {
  console.log(req.body);
  res.sendStatus(200);
});
//Subscribe to a topic
var sub = &common.Subscription{
	PubsubName: "pubsub",
	Topic:      "orders",
	Route:      "/orders",
}

func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
	log.Printf("Subscriber received: %s", e.Data)
	return false, nil
}

The /orders endpoint matches the route defined in the subscriptions and this is where Dapr sends all topic messages to.

Streaming subscriptions

Streaming subscriptions are subscriptions defined in application code that can be dynamically stopped and started at runtime. Messages are pulled by the application from Dapr. This means no endpoint is needed to subscribe to a topic, and it’s possible to subscribe without any app configured on the sidecar at all. Any number of pubsubs and topics can be subscribed to at once. As messages are sent to the given message handler code, there is no concept of routes or bulk subscriptions.

Note: Only a single pubsub/topic pair per application may be subscribed at a time.

The example below shows the different ways to stream subscribe to a topic.

You can use the SubscribeAsync method on the DaprPublishSubscribeClient to configure the message handler to use to pull messages from the stream.

using System.Text;
using Dapr.Messaging.PublishSubscribe;
using Dapr.Messaging.PublishSubscribe.Extensions;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDaprPubSubClient();
var app = builder.Build();

var messagingClient = app.Services.GetRequiredService<DaprPublishSubscribeClient>();

//Create a dynamic streaming subscription and subscribe with a timeout of 30 seconds and 10 seconds for message handling
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var subscription = await messagingClient.SubscribeAsync("pubsub", "myTopic",
    new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)),
    HandleMessageAsync, cancellationTokenSource.Token);

await Task.Delay(TimeSpan.FromMinutes(1));

//When you're done with the subscription, simply dispose of it
await subscription.DisposeAsync();
return;

//Process each message returned from the subscription
Task<TopicResponseAction> HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default)
{
    try
    {
        //Do something with the message
        Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
        return Task.FromResult(TopicResponseAction.Success);
    }
    catch
    {
        return Task.FromResult(TopicResponseAction.Retry);
    }
}

Learn more about streaming subscriptions using the .NET SDK client.

You can use the subscribe method, which returns a Subscription object and allows you to pull messages from the stream by calling the next_message method. This runs in and may block the main thread while waiting for messages.

import time

from dapr.clients import DaprClient
from dapr.clients.grpc.subscription import StreamInactiveError

counter = 0


def process_message(message):
    global counter
    counter += 1
    # Process the message here
    print(f'Processing message: {message.data()} from {message.topic()}...')
    return 'success'


def main():
    with DaprClient() as client:
        global counter

        subscription = client.subscribe(
            pubsub_name='pubsub', topic='orders', dead_letter_topic='orders_dead'
        )

        try:
            while counter < 5:
                try:
                    message = subscription.next_message()

                except StreamInactiveError as e:
                    print('Stream is inactive. Retrying...')
                    time.sleep(1)
                    continue
                if message is None:
                    print('No message received within timeout period.')
                    continue

                # Process the message
                response_status = process_message(message)

                if response_status == 'success':
                    subscription.respond_success(message)
                elif response_status == 'retry':
                    subscription.respond_retry(message)
                elif response_status == 'drop':
                    subscription.respond_drop(message)

        finally:
            print("Closing subscription...")
            subscription.close()


if __name__ == '__main__':
    main()

You can also use the subscribe_with_handler method, which accepts a callback function executed for each message received from the stream. This runs in a separate thread, so it doesn’t block the main thread.

import time

from dapr.clients import DaprClient
from dapr.clients.grpc._response import TopicEventResponse

counter = 0


def process_message(message):
    # Process the message here
    global counter
    counter += 1
    print(f'Processing message: {message.data()} from {message.topic()}...')
    return TopicEventResponse('success')


def main():
    with (DaprClient() as client):
        # This will start a new thread that will listen for messages
        # and process them in the `process_message` function
        close_fn = client.subscribe_with_handler(
            pubsub_name='pubsub', topic='orders', handler_fn=process_message,
            dead_letter_topic='orders_dead'
        )

        while counter < 5:
            time.sleep(1)

        print("Closing subscription...")
        close_fn()


if __name__ == '__main__':
    main()

Learn more about streaming subscriptions using the Python SDK client.

package main

import (
	"context"
	"log"

	"github.com/dapr/go-sdk/client"
)

func main() {
	cl, err := client.NewClient()
	if err != nil {
		log.Fatal(err)
	}

	sub, err := cl.Subscribe(context.Background(), client.SubscriptionOptions{
		PubsubName: "pubsub",
		Topic:      "orders",
	})
	if err != nil {
		panic(err)
	}
	// Close must always be called.
	defer sub.Close()

	for {
		msg, err := sub.Receive()
		if err != nil {
			panic(err)
		}

		// Process the event

		// We _MUST_ always signal the result of processing the message, else the
		// message will not be considered as processed and will be redelivered or
		// dead lettered.
		// msg.Retry()
		// msg.Drop()
		if err := msg.Success(); err != nil {
			panic(err)
		}
	}
}

or

package main

import (
	"context"
	"log"

	"github.com/dapr/go-sdk/client"
	"github.com/dapr/go-sdk/service/common"
)

func main() {
	cl, err := client.NewClient()
	if err != nil {
		log.Fatal(err)
	}

	stop, err := cl.SubscribeWithHandler(context.Background(),
		client.SubscriptionOptions{
			PubsubName: "pubsub",
			Topic:      "orders",
		},
		eventHandler,
	)
	if err != nil {
		panic(err)
	}

	// Stop must always be called.
	defer stop()

	<-make(chan struct{})
}

func eventHandler(e *common.TopicEvent) common.SubscriptionResponseStatus {
	// Process message here
    // common.SubscriptionResponseStatusRetry
    // common.SubscriptionResponseStatusDrop
			common.SubscriptionResponseStatusDrop, status)
	}

	return common.SubscriptionResponseStatusSuccess
}

Demo

Watch this video for an overview on streaming subscriptions:

Programmatic subscriptions

The dynamic programmatic approach returns the routes JSON structure within the code, unlike the declarative approach’s route YAML structure.

Note: Programmatic subscriptions are only read once during application start-up. You cannot dynamically add new programmatic subscriptions, only at new ones at compile time.

In the example below, you define the values found in the declarative YAML subscription above within the application code.

[Topic("pubsub", "orders")]
[HttpPost("/orders")]
public async Task<ActionResult<Order>>Checkout(Order order, [FromServices] DaprClient daprClient)
{
    // Logic
    return order;
}

or

// Dapr subscription in [Topic] routes orders topic to this route
app.MapPost("/orders", [Topic("pubsub", "orders")] (Order order) => {
    Console.WriteLine("Subscriber received : " + order);
    return Results.Ok(order);
});

Both of the handlers defined above also need to be mapped to configure the dapr/subscribe endpoint. This is done in the application startup code while defining endpoints.

app.UseEndpoints(endpoints =>
{
    endpoints.MapSubscribeHandler();
});
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Topic(name = "orders", pubsubName = "pubsub")
@PostMapping(path = "/orders")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
  return Mono.fromRunnable(() -> {
    try {
      System.out.println("Subscriber received: " + cloudEvent.getData());
      System.out.println("Subscriber received: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  });
}
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
    subscriptions = [
      {
        'pubsubname': 'pubsub',
        'topic': 'orders',
        'routes': {
          'rules': [
            {
              'match': 'event.type == "order"',
              'path': '/orders'
            },
          ],
          'default': '/orders'
        }
      }]
    return jsonify(subscriptions)

@app.route('/orders', methods=['POST'])
def ds_subscriber():
    print(request.json, flush=True)
    return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));

const port = 3000

app.get('/dapr/subscribe', (req, res) => {
  res.json([
    {
      pubsubname: "pubsub",
      topic: "orders",
      routes: {
        rules: [
          {
            match: 'event.type == "order"',
            path: '/orders'
          },
        ],
        default: '/products'
      }
    }
  ]);
})

app.post('/orders', (req, res) => {
  console.log(req.body);
  res.sendStatus(200);
});

app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"

	"github.com/gorilla/mux"
)

const appPort = 3000

type subscription struct {
	PubsubName string            `json:"pubsubname"`
	Topic      string            `json:"topic"`
	Metadata   map[string]string `json:"metadata,omitempty"`
	Routes     routes            `json:"routes"`
}

type routes struct {
	Rules   []rule `json:"rules,omitempty"`
	Default string `json:"default,omitempty"`
}

type rule struct {
	Match string `json:"match"`
	Path  string `json:"path"`
}

// This handles /dapr/subscribe
func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
	t := []subscription{
		{
			PubsubName: "pubsub",
			Topic:      "orders",
			Routes: routes{
				Rules: []rule{
					{
						Match: `event.type == "order"`,
						Path:  "/orders",
					},
				},
				Default: "/orders",
			},
		},
	}

	w.WriteHeader(http.StatusOK)
	json.NewEncoder(w).Encode(t)
}

func main() {
	router := mux.NewRouter().StrictSlash(true)
	router.HandleFunc("/dapr/subscribe", configureSubscribeHandler).Methods("GET")
	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", appPort), router))
}

Next Steps

7 - Dead Letter Topics

Use subscription dead letter topics to forward undeliverable messages

Introduction

There are times when applications might not be able to handle messages for a variety of reasons. For example, there could be transient issues retrieving data needed to process a message or the app business logic fails returning an error. Dead letter topics are used to forward messages that cannot be delivered to a subscribing app. This eases the pressure on app by freeing them from dealing with these failed messages, allowing developers to write code that reads from the dead letter topic and either fixes the message and resends this, or abandons it completely.

Dead letter topics are typically used in along with a retry resiliency policy and a dead letter subscription that handles the required logic for dealing with the messages forwarded from the dead letter topic.

When a dead letter topic is set, any message that failed to be delivered to an app for a configured topic is put on the dead letter topic to be forwarded to a subscription that handles these messages. This could be the same app or a completely different one.

Dapr enables dead letter topics for all of it’s pub/sub components, even if the underlying system does not support this feature natively. For example the AWS SNS Component has a dead letter queue and RabbitMQ has the dead letter topics. You will need to ensure that you configure components like this appropriately.

The diagram below is an example of how dead letter topics work. First a message is sent from a publisher on an orders topic. Dapr receives the message on behalf of a subscriber application, however the orders topic message fails to be delivered to the /checkout endpoint on the application, even after retries. As a result of the failure to deliver, the message is forwarded to the poisonMessages topic which delivers this to the /failedMessages endpoint to be processed, in this case on the same application. The failedMessages processing code could drop the message or resend a new message.

Configuring a dead letter topic with a declarative subscription

The following YAML shows how to configure a subscription with a dead letter topic named poisonMessages for messages consumed from the orders topic. This subscription is scoped to an app with a checkout ID.

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: order
spec:
  topic: orders
  routes: 
    default: /checkout
  pubsubname: pubsub
  deadLetterTopic: poisonMessages
scopes:
- checkout

Configuring a dead letter topic with a streaming subscription

	var deadLetterTopic = "poisonMessages"
	sub, err := cl.Subscribe(context.Background(), client.SubscriptionOptions{
		PubsubName:      "pubsub",
		Topic:           "orders",
		DeadLetterTopic: &deadLetterTopic,
	})

Configuring a dead letter topic with programmatic subscription

The JSON returned from the /subscribe endpoint shows how to configure a dead letter topic named poisonMessages for messages consumed from the orders topic.

app.get('/dapr/subscribe', (_req, res) => {
    res.json([
        {
            pubsubname: "pubsub",
            topic: "orders",
            route: "/checkout",
            deadLetterTopic: "poisonMessages"
        }
    ]);
});

Retries and dead letter topics

By default, when a dead letter topic is set, any failing message immediately goes to the dead letter topic. As a result it is recommend to always have a retry policy set when using dead letter topics in a subscription. To enable the retry of a message before sending it to the dead letter topic, apply a retry resiliency policy to the pub/sub component.

This example shows how to set a constant retry policy named pubsubRetry, with 10 maximum delivery attempts applied every 5 seconds for the pubsub pub/sub component.

apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
  name: myresiliency
spec:
  policies:
    retries:
      pubsubRetry:
        policy: constant
        duration: 5s
        maxRetries: 10
  targets:
    components:
      pubsub:
        inbound:
          retry: pubsubRetry

Configuring a subscription for handling the dead letter topics

Remember to now configure a subscription to handling the dead letter topics. For example you can create another declarative subscription to receive these on the same or a different application. The example below shows the checkout application subscribing to the poisonMessages topic with another subscription and sending these to be handled by the /failedmessages endpoint.

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: deadlettertopics
spec:
  topic: poisonMessages
  routes:
    rules:
      - match:
        path: /failedMessages
  pubsubname: pubsub
scopes:
- checkout

Demo

Watch this video for an overview of the dead letter topics:

Next steps

8 - How to: Set up pub/sub namespace consumer groups

Learn how to use the metadata-based namespace consumer group in your component

You’ve set up Dapr’s pub/sub API building block, and your applications are publishing and subscribing to topics smoothly, using a centralized message broker. What if you’d like to perform simple A/B testing, blue/green deployments, or even canary deployments for your applications? Even with using Dapr, this can prove difficult.

Dapr solves multi-tenancy at-scale with its pub/sub namespace consumer groups construct.

Without namespace consumer groups

Let’s say you have a Kubernetes cluster, with two applications (App1 and App2) deployed to the same namespace (namespace-a). App2 publishes to a topic called order, while App1 subscribes to the topic called order. This will create two consumer groups, named after your applications (App1 and App2).

Diagram showing basic pubsub process.

In order to perform simple testing and deployments while using a centralized message broker, you create another namespace with two applications of the same app-id, App1 and App2.

Dapr creates consumer groups using the app-id of individual applications, so the consumer group names will remain App1 and App2.

Diagram showing complications around multi-tenancy without Dapr namespace consumer groups.

To avoid this, you’d then need to have something “creep” into your code to change the app-id, depending on the namespace on which you’re running. This workaround is cumbersome and a significant painpoint.

With namespace consumer groups

Not only can Dapr allow you to change the behavior of a consumer group with a consumerID for your UUID and pod names, Dapr also provides a namespace construct that lives in the pub/sub component metadata. For example, using Redis as your message broker:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
  - name: consumerID
    value: "{namespace}"

By configuring consumerID with the {namespace} value, you’ll be able to use the same app-id with the same topics from different namespaces.

Diagram showing how namespace consumer groups help with multi-tenancy.

In the diagram above, you have two namespaces, each with applications of the same app-id, publishing and subscribing to the same centralized message broker orders. This time, however, Dapr has created consumer group names prefixed with the namespace in which they’re running.

Without you needing to change your code/app-id, the namespace consumer group allows you to:

  • Add more namespaces
  • Keep the same topics
  • Keep the same app-id across namespaces
  • Have your entire deployment pipeline remain intact

Simply include the "{namespace}" consumer group construct in your component metadata. You don’t need to encode the namespace in the metadata. Dapr understands the namespace it is running in and completes the namespace value for you, like a dynamic metadata value injected by the runtime.

Demo

Watch this video for an overview on pub/sub multi-tenancy:

Next steps

  • Learn more about configuring Pub/Sub components with multiple namespaces pub/sub namespaces.

9 - How to: Horizontally scale subscribers with StatefulSets

Learn how to subscribe with StatefulSet and scale horizontally with consistent consumer IDs

Unlike Deployments, where Pods are ephemeral, StatefulSets allows deployment of stateful applications on Kubernetes by keeping a sticky identity for each Pod.

Below is an example of a StatefulSet with Dapr:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: python-subscriber
spec:
  selector:
    matchLabels:
      app: python-subscriber  # has to match .spec.template.metadata.labels
  serviceName: "python-subscriber"
  replicas: 3
  template:
    metadata:
      labels:
        app: python-subscriber # has to match .spec.selector.matchLabels
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "python-subscriber"
        dapr.io/app-port: "5001"
    spec:
      containers:
      - name: python-subscriber
        image: ghcr.io/dapr/samples/pubsub-python-subscriber:latest
        ports:
        - containerPort: 5001
        imagePullPolicy: Always

When subscribing to a pub/sub topic via Dapr, the application can define the consumerID, which determines the subscriber’s position in the queue or topic. With the StatefulSets sticky identity of Pods, you can have a unique consumerID per Pod, allowing each horizontal scale of the subscriber application. Dapr keeps track of the name of each Pod, which can be used when declaring components using the {podName} marker.

On scaling the number of subscribers of a given topic, each Dapr component has unique settings that determine the behavior. Usually, there are two options for multiple consumers:

  • Broadcast: each message published to the topic will be consumed by all subscribers.
  • Shared: a message is consumed by any subscriber (but not all).

Kafka isolates each subscriber by consumerID with its own position in the topic. When an instance restarts, it reuses the same consumerID and continues from its last known position, without skipping messages. The component below demonstrates how a Kafka component can be used by multiple Pods:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.kafka
  version: v1
  metadata:
  - name: brokers
    value: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
  - name: consumerID
    value: "{podName}"
  - name: authRequired
    value: "false"

The MQTT3 protocol has shared topics, allowing multiple subscribers to “compete” for messages from the topic, meaning a message is only processed by one of them. For example:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: mqtt-pubsub
spec:
  type: pubsub.mqtt3
  version: v1
  metadata:
    - name: consumerID
      value: "{podName}"
    - name: cleanSession
      value: "true"
    - name: url
      value: "tcp://admin:public@localhost:1883"
    - name: qos
      value: 1
    - name: retain
      value: "false"

Next steps

10 - Scope Pub/sub topic access

Use scopes to limit pub/sub topics to specific applications

Introduction

Namespaces or component scopes can be used to limit component access to particular applications. These application scopes added to a component limit only the applications with specific IDs to be able to use the component.

In addition to this general component scope, the following can be limited for pub/sub components:

  • Which topics can be used (published or subscribed)
  • Which applications are allowed to publish to specific topics
  • Which applications are allowed to subscribe to specific topics

This is called pub/sub topic scoping.

Pub/sub scopes are defined for each pub/sub component. You may have a pub/sub component named pubsub that has one set of scopes, and another pubsub2 with a different set.

To use this topic scoping three metadata properties can be set for a pub/sub component:

  • spec.metadata.publishingScopes
    • A semicolon-separated list of applications & comma-separated topic lists, allowing that app to publish to that list of topics
    • If nothing is specified in publishingScopes (default behavior), all apps can publish to all topics
    • To deny an app the ability to publish to any topic, leave the topics list blank (app1=;app2=topic2)
    • For example, app1=topic1;app2=topic2,topic3;app3= will allow app1 to publish to topic1 and nothing else, app2 to publish to topic2 and topic3 only, and app3 to publish to nothing.
  • spec.metadata.subscriptionScopes
    • A semicolon-separated list of applications & comma-separated topic lists, allowing that app to subscribe to that list of topics
    • If nothing is specified in subscriptionScopes (default behavior), all apps can subscribe to all topics
    • For example, app1=topic1;app2=topic2,topic3 will allow app1 to subscribe to topic1 only and app2 to subscribe to topic2 and topic3
  • spec.metadata.allowedTopics
    • A comma-separated list of allowed topics for all applications.
    • If allowedTopics is not set (default behavior), all topics are valid. subscriptionScopes and publishingScopes still take place if present.
    • publishingScopes or subscriptionScopes can be used in conjunction with allowedTopics to add granular limitations
  • spec.metadata.protectedTopics
    • A comma-separated list of protected topics for all applications.
    • If a topic is marked as protected then an application must be explicitly granted publish or subscribe permissions through publishingScopes or subscriptionScopes to publish/subscribe to it.

These metadata properties can be used for all pub/sub components. The following examples use Redis as pub/sub component.

Example 1: Scope topic access

Limiting which applications can publish/subscribe to topics can be useful if you have topics which contain sensitive information and only a subset of your applications are allowed to publish or subscribe to these.

It can also be used for all topics to have always a “ground truth” for which applications are using which topics as publishers/subscribers.

Here is an example of three applications and three topics:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: "localhost:6379"
  - name: redisPassword
    value: ""
  - name: publishingScopes
    value: "app1=topic1;app2=topic2,topic3;app3="
  - name: subscriptionScopes
    value: "app2=;app3=topic1"

The table below shows which applications are allowed to publish into the topics:

topic1 topic2 topic3
app1
app2
app3

The table below shows which applications are allowed to subscribe to the topics:

topic1 topic2 topic3
app1
app2
app3

Note: If an application is not listed (e.g. app1 in subscriptionScopes) it is allowed to subscribe to all topics. Because allowedTopics is not used and app1 does not have any subscription scopes, it can also use additional topics not listed above.

Example 2: Limit allowed topics

A topic is created if a Dapr application sends a message to it. In some scenarios this topic creation should be governed. For example:

  • A bug in a Dapr application on generating the topic name can lead to an unlimited amount of topics created
  • Streamline the topics names and total count and prevent an unlimited growth of topics

In these situations allowedTopics can be used.

Here is an example of three allowed topics:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: "localhost:6379"
  - name: redisPassword
    value: ""
  - name: allowedTopics
    value: "topic1,topic2,topic3"

All applications can use these topics, but only those topics, no others are allowed.

Example 3: Combine allowedTopics and scopes

Sometimes you want to combine both scopes, thus only having a fixed set of allowed topics and specify scoping to certain applications.

Here is an example of three applications and two topics:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: "localhost:6379"
  - name: redisPassword
    value: ""
  - name: allowedTopics
    value: "A,B"
  - name: publishingScopes
    value: "app1=A"
  - name: subscriptionScopes
    value: "app1=;app2=A"

Note: The third application is not listed, because if an app is not specified inside the scopes, it is allowed to use all topics.

The table below shows which application is allowed to publish into the topics:

A B C
app1
app2
app3

The table below shows which application is allowed to subscribe to the topics:

A B C
app1
app2
app3

Example 4: Mark topics as protected

If your topic involves sensitive data, each new application must be explicitly listed in the publishingScopes and subscriptionScopes to ensure it cannot read from or write to that topic. Alternatively, you can designate the topic as ‘protected’ (using protectedTopics) and grant access only to specific applications that genuinely require it.

Here is an example of three applications and three topics, two of which are protected:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: "localhost:6379"
  - name: redisPassword
    value: ""
  - name: protectedTopics
    value: "A,B"
  - name: publishingScopes
    value: "app1=A,B;app2=B"
  - name: subscriptionScopes
    value: "app1=A,B;app2=B"

In the example above, topics A and B are marked as protected. As a result, even though app3 is not listed under publishingScopes or subscriptionScopes, it cannot interact with these topics.

The table below shows which application is allowed to publish into the topics:

A B C
app1
app2
app3

The table below shows which application is allowed to subscribe to the topics:

A B C
app1
app2
app3

Demo

Next steps

11 - Message Time-to-Live (TTL)

Use time-to-live in pub/sub messages.

Introduction

Dapr enables per-message time-to-live (TTL). This means that applications can set time-to-live per message, and subscribers do not receive those messages after expiration.

All Dapr pub/sub components are compatible with message TTL, as Dapr handles the TTL logic within the runtime. Simply set the ttlInSeconds metadata when publishing a message.

In some components, such as Kafka, time-to-live can be configured in the topic via retention.ms as per documentation. With message TTL in Dapr, applications using Kafka can now set time-to-live per message in addition to per topic.

Native message TTL support

When message time-to-live has native support in the pub/sub component, Dapr simply forwards the time-to-live configuration without adding any extra logic, keeping predictable behavior. This is helpful when the expired messages are handled differently by the component. For example, with Azure Service Bus, where expired messages are stored in the dead letter queue and are not simply deleted.

Supported components

Azure Service Bus

Azure Service Bus supports entity level time-to-live. This means that messages have a default time-to-live but can also be set with a shorter timespan at publishing time. Dapr propagates the time-to-live metadata for the message and lets Azure Service Bus handle the expiration directly.

Non-Dapr subscribers

If messages are consumed by subscribers not using Dapr, the expired messages are not automatically dropped, as expiration is handled by the Dapr runtime when a Dapr sidecar receives a message. However, subscribers can programmatically drop expired messages by adding logic to handle the expiration attribute in the cloud event, which follows the RFC3339 format.

When non-Dapr subscribers use components such as Azure Service Bus, which natively handle message TTL, they do not receive expired messages. Here, no extra logic is needed.

Example

Message TTL can be set in the metadata as part of the publishing request:

curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/TOPIC_A?metadata.ttlInSeconds=120 -H "Content-Type: application/json" -d '{"order-number": "345"}'
from dapr.clients import DaprClient

with DaprClient() as d:
    req_data = {
        'order-number': '345'
    }
    # Create a typed message with content type and body
    resp = d.publish_event(
        pubsub_name='pubsub',
        topic='TOPIC_A',
        data=json.dumps(req_data),
        publish_metadata={'ttlInSeconds': '120'}
    )
    # Print the request
    print(req_data, flush=True)
<?php

require_once __DIR__.'/vendor/autoload.php';

$app = \Dapr\App::create();
$app->run(function(\DI\FactoryInterface $factory) {
    $publisher = $factory->make(\Dapr\PubSub\Publish::class, ['pubsub' => 'pubsub']);
    $publisher->topic('TOPIC_A')->publish('data', ['ttlInSeconds' => '120']);
});

See this guide for a reference on the pub/sub API.

Next steps

12 - Publish and subscribe to bulk messages

Learn how to use the bulk publish and subscribe APIs in Dapr.

With the bulk publish and subscribe APIs, you can publish and subscribe to multiple messages in a single request. When writing applications that need to send or receive a large number of messages, using bulk operations allows achieving high throughput by reducing the overall number of requests between the Dapr sidecar, the application, and the underlying pub/sub broker.

Publishing messages in bulk

Restrictions when publishing messages in bulk

The bulk publish API allows you to publish multiple messages to a topic in a single request. It is non-transactional, i.e., from a single bulk request, some messages can succeed and some can fail. If any of the messages fail to publish, the bulk publish operation returns a list of failed messages.

The bulk publish operation also does not guarantee any ordering of messages.

Example

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import java.util.ArrayList;
import java.util.List;

class BulkPublisher {
  private static final String PUBSUB_NAME = "my-pubsub-name";
  private static final String TOPIC_NAME = "topic-a";

  public void publishMessages() {
    try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
      // Create a list of messages to publish
      List<String> messages = new ArrayList<>();
      for (int i = 0; i < 10; i++) {
        String message = String.format("This is message #%d", i);
        messages.add(message);
      }

      // Publish list of messages using the bulk publish API
      BulkPublishResponse<String> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block();
    }
  }
}

import { DaprClient } from "@dapr/dapr";

const pubSubName = "my-pubsub-name";
const topic = "topic-a";

async function start() {
    const client = new DaprClient();

    // Publish multiple messages to a topic.
    await client.pubsub.publishBulk(pubSubName, topic, ["message 1", "message 2", "message 3"]);

    // Publish multiple messages to a topic with explicit bulk publish messages.
    const bulkPublishMessages = [
    {
      entryID: "entry-1",
      contentType: "application/json",
      event: { hello: "foo message 1" },
    },
    {
      entryID: "entry-2",
      contentType: "application/cloudevents+json",
      event: {
        specversion: "1.0",
        source: "/some/source",
        type: "example",
        id: "1234",
        data: "foo message 2",
        datacontenttype: "text/plain"
      },
    },
    {
      entryID: "entry-3",
      contentType: "text/plain",
      event: "foo message 3",
    },
  ];
  await client.pubsub.publishBulk(pubSubName, topic, bulkPublishMessages);
}

start().catch((e) => {
  console.error(e);
  process.exit(1);
});
using System;
using System.Collections.Generic;
using Dapr.Client;

const string PubsubName = "my-pubsub-name";
const string TopicName = "topic-a";
IReadOnlyList<object> BulkPublishData = new List<object>() {
    new { Id = "17", Amount = 10m },
    new { Id = "18", Amount = 20m },
    new { Id = "19", Amount = 30m }
};

using var client = new DaprClientBuilder().Build();

var res = await client.BulkPublishEventAsync(PubsubName, TopicName, BulkPublishData);
if (res == null) {
    throw new Exception("null response from dapr");
}
if (res.FailedEntries.Count > 0)
{
    Console.WriteLine("Some events failed to be published!");
    foreach (var failedEntry in res.FailedEntries)
    {
        Console.WriteLine("EntryId: " + failedEntry.Entry.EntryId + " Error message: " +
                          failedEntry.ErrorMessage);
    }
}
else
{
    Console.WriteLine("Published all events!");
}
import requests
import json

base_url = "http://localhost:3500/v1.0-alpha1/publish/bulk/{}/{}"
pubsub_name = "my-pubsub-name"
topic_name = "topic-a"
payload = [
  {
    "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
    "event": "first text message",
    "contentType": "text/plain"
  },
  {
    "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
    "event": {
      "message": "second JSON message"
    },
    "contentType": "application/json"
  }
]

response = requests.post(base_url.format(pubsub_name, topic_name), json=payload)
print(response.status_code)
package main

import (
  "fmt"
  "strings"
  "net/http"
  "io/ioutil"
)

const (
  pubsubName = "my-pubsub-name"
  topicName = "topic-a"
  baseUrl = "http://localhost:3500/v1.0-alpha1/publish/bulk/%s/%s"
)

func main() {
  url := fmt.Sprintf(baseUrl, pubsubName, topicName)
  method := "POST"
  payload := strings.NewReader(`[
        {
            "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
            "event":  "first text message",
            "contentType": "text/plain"
        },
        {
            "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
            "event":  {
                "message": "second JSON message"
            },
            "contentType": "application/json"
        }
]`)

  client := &http.Client {}
  req, _ := http.NewRequest(method, url, payload)

  req.Header.Add("Content-Type", "application/json")
  res, err := client.Do(req)
  // ...
}
curl -X POST http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a \
  -H 'Content-Type: application/json' \
  -d '[
        {
            "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
            "event":  "first text message",
            "contentType": "text/plain"
        },
        {
            "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
            "event":  {
                "message": "second JSON message"
            },
            "contentType": "application/json"
        },
      ]'
Invoke-RestMethod -Method Post -ContentType 'application/json' -Uri 'http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a' `
-Body '[
        {
            "entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
            "event":  "first text message",
            "contentType": "text/plain"
        },
        {
            "entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
            "event":  {
                "message": "second JSON message"
            },
            "contentType": "application/json"
        },
      ]'

Subscribing messages in bulk

The bulk subscribe API allows you to subscribe multiple messages from a topic in a single request. As we know from How to: Publish & Subscribe to topics, there are three ways to subscribe to topic(s):

  • Declaratively - subscriptions are defined in an external file.
  • Programmatically - subscriptions are defined in code.
  • Streaming - Not supported for bulk subscribe as messages are sent to handler code.

To Bulk Subscribe to topic(s), we just need to use bulkSubscribe spec attribute, something like following:

apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
  name: order-pub-sub
spec:
  topic: orders
  routes:
    default: /checkout
  pubsubname: order-pub-sub
  bulkSubscribe:
    enabled: true
    maxMessagesCount: 100
    maxAwaitDurationMs: 40
scopes:
- orderprocessing
- checkout

In the example above, bulkSubscribe is optional. If you use bulkSubscribe, then:

  • enabled is mandatory and enables or disables bulk subscriptions on this topic
  • You can optionally configure the max number of messages (maxMessagesCount) delivered in a bulk message. Default value of maxMessagesCount for components not supporting bulk subscribe is 100 i.e. for default bulk events between App and Dapr. Please refer How components handle publishing and subscribing to bulk messages. If a component supports bulk subscribe, then default value for this parameter can be found in that component doc.
  • You can optionally provide the max duration to wait (maxAwaitDurationMs) before a bulk message is sent to the app. Default value of maxAwaitDurationMs for components not supporting bulk subscribe is 1000 i.e. for default bulk events between App and Dapr. Please refer How components handle publishing and subscribing to bulk messages. If a component supports bulk subscribe, then default value for this parameter can be found in that component doc.

The application receives an EntryId associated with each entry (individual message) in the bulk message. This EntryId must be used by the app to communicate the status of that particular entry. If the app fails to notify on an EntryId status, it’s considered a RETRY.

A JSON-encoded payload body with the processing status against each entry needs to be sent:

{
  "statuses":
  [
    {
    "entryId": "<entryId1>",
    "status": "<status>"
    },
    {
    "entryId": "<entryId2>",
    "status": "<status>"
    }
  ]
}

Possible status values:

Status Description
SUCCESS Message is processed successfully
RETRY Message to be retried by Dapr
DROP Warning is logged and message is dropped

Refer to Expected HTTP Response for Bulk Subscribe for further insights on response.

Example

The following code examples demonstrate how to use Bulk Subscribe.

import io.dapr.Topic;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
import io.dapr.client.domain.BulkSubscribeMessage;
import io.dapr.client.domain.BulkSubscribeMessageEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import reactor.core.publisher.Mono;

class BulkSubscriber {
  @BulkSubscribe()
  // @BulkSubscribe(maxMessagesCount = 100, maxAwaitDurationMs = 40)
  @Topic(name = "topicbulk", pubsubName = "orderPubSub")
  @PostMapping(path = "/topicbulk")
  public Mono<BulkSubscribeAppResponse> handleBulkMessage(
          @RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
    return Mono.fromCallable(() -> {
      List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
      for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
        try {
          CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
          System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
          entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
        } catch (Exception e) {
          e.printStackTrace();
          entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
        }
      }
      return new BulkSubscribeAppResponse(entries);
    });
  }
}

import { DaprServer } from "@dapr/dapr";

const pubSubName = "orderPubSub";
const topic = "topicbulk";

const daprHost = process.env.DAPR_HOST || "127.0.0.1";
const daprPort = process.env.DAPR_HTTP_PORT || "3502";
const serverHost = process.env.SERVER_HOST || "127.0.0.1";
const serverPort = process.env.APP_PORT || 5001;

async function start() {
    const server = new DaprServer({
        serverHost,
        serverPort,
        clientOptions: {
            daprHost,
            daprPort,
        },
    });

    // Publish multiple messages to a topic with default config.
    await client.pubsub.bulkSubscribeWithDefaultConfig(pubSubName, topic, (data) => console.log("Subscriber received: " + JSON.stringify(data)));

    // Publish multiple messages to a topic with specific maxMessagesCount and maxAwaitDurationMs.
    await client.pubsub.bulkSubscribeWithConfig(pubSubName, topic, (data) => console.log("Subscriber received: " + JSON.stringify(data)), 100, 40);
}
using Microsoft.AspNetCore.Mvc;
using Dapr.AspNetCore;
using Dapr;

namespace DemoApp.Controllers;

[ApiController]
[Route("[controller]")]
public class BulkMessageController : ControllerBase
{
    private readonly ILogger<BulkMessageController> logger;

    public BulkMessageController(ILogger<BulkMessageController> logger)
    {
        this.logger = logger;
    }

    [BulkSubscribe("messages", 10, 10)]
    [Topic("pubsub", "messages")]
    public ActionResult<BulkSubscribeAppResponse> HandleBulkMessages([FromBody] BulkSubscribeMessage<BulkMessageModel<BulkMessageModel>> bulkMessages)
    {
        List<BulkSubscribeAppResponseEntry> responseEntries = new List<BulkSubscribeAppResponseEntry>();
        logger.LogInformation($"Received {bulkMessages.Entries.Count()} messages");
        foreach (var message in bulkMessages.Entries)
        {
            try
            {
                logger.LogInformation($"Received a message with data '{message.Event.Data.MessageData}'");
                responseEntries.Add(new BulkSubscribeAppResponseEntry(message.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
            }
            catch (Exception e)
            {
                logger.LogError(e.Message);
                responseEntries.Add(new BulkSubscribeAppResponseEntry(message.EntryId, BulkSubscribeAppResponseStatus.RETRY));
            }
        }
        return new BulkSubscribeAppResponse(responseEntries);
    }
    public class BulkMessageModel
    {
        public string MessageData { get; set; }
    }
}

Currently, you can only bulk subscribe in Python using an HTTP client.

import json
from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
    # Define the bulk subscribe configuration
    subscriptions = [{
        "pubsubname": "pubsub",
        "topic": "TOPIC_A",
        "route": "/checkout",
        "bulkSubscribe": {
            "enabled": True,
            "maxMessagesCount": 3,
            "maxAwaitDurationMs": 40
        }
    }]
    print('Dapr pub/sub is subscribed to: ' + json.dumps(subscriptions))
    return jsonify(subscriptions)


# Define the endpoint to handle incoming messages
@app.route('/checkout', methods=['POST'])
def checkout():
    messages = request.json
    print(messages)
    for message in messages:
        print(f"Received message: {message}")
    return json.dumps({'success': True}), 200, {'ContentType': 'application/json'}

if __name__ == '__main__':
    app.run(port=5000)

How components handle publishing and subscribing to bulk messages

For event publish/subscribe, two kinds of network transfers are involved.

  1. From/To App To/From Dapr.
  2. From/To Dapr To/From Pubsub Broker.

These are the opportunities where optimization is possible. When optimized, Bulk requests are made, which reduce the overall number of calls and thus increases throughput and provides better latency.

On enabling Bulk Publish and/or Bulk Subscribe, the communication between the App and Dapr sidecar (Point 1 above) is optimized for all components.

Optimization from Dapr sidecar to the pub/sub broker depends on a number of factors, for example:

  • Broker must inherently support Bulk pub/sub
  • The Dapr component must be updated to support the use of bulk APIs provided by the broker

Currently, the following components are updated to support this level of optimization:

Component Bulk Publish Bulk Subscribe
Kafka Yes Yes
Azure Servicebus Yes Yes
Azure Eventhubs Yes Yes

Demos

Watch the following demos and presentations about bulk pub/sub.

KubeCon Europe 2023 presentation

Dapr Community Call #77 presentation