More about Dapr Pub/sub
Learn more about how to use Dapr Pub/sub:
- Try the Pub/sub quickstart.
- Explore pub/sub via any of the supporting Dapr SDKs.
- Review the Pub/sub API reference documentation.
- Browse the supported pub/sub component specs.
This is the multi-page printable view of this section. Click here to print.
Learn more about how to use Dapr Pub/sub:
Publish and subscribe (pub/sub) enables microservices to communicate with each other using messages for event-driven architectures.
An intermediary message broker copies each message from a publisher’s input channel to an output channel for all subscribers interested in that message. This pattern is especially useful when you need to decouple microservices from one another.
The pub/sub API in Dapr:
The specific message broker used by your service is pluggable and configured as a Dapr pub/sub component at runtime. This removes the dependency from your service and makes your service more portable and flexible to changes.
When using pub/sub in Dapr:
The following overview video and demo demonstrates how Dapr pub/sub works.
In the diagram below, a “shipping” service and an “email” service have both subscribed to topics published by a “cart” service. Each service loads pub/sub component configuration files that point to the same pub/sub message broker component; for example: Redis Streams, NATS Streaming, Azure Service Bus, or GCP pub/sub.
In the diagram below, the Dapr API posts an “order” topic from the publishing “cart” service to “order” endpoints on the “shipping” and “email” subscribing services.
View the complete list of pub/sub components that Dapr supports.
The pub/sub API building block brings several features to your application.
To enable message routing and provide additional context with each message between services, Dapr uses the CloudEvents 1.0 specification as its message format. Any message sent by an application to a topic using Dapr is automatically wrapped in a Cloud Events envelope, using Content-Type
header value for datacontenttype
attribute.
For more information, read about messaging with CloudEvents, or sending raw messages without CloudEvents.
If one of your applications uses Dapr while another doesn’t, you can disable the CloudEvent wrapping for a publisher or subscriber. This allows partial adoption of Dapr pub/sub in applications that cannot adopt Dapr all at once.
For more information, read how to use pub/sub without CloudEvents.
When publishing a message, it’s important to specify the content type of the data being sent. Unless specified, Dapr will assume text/plain
.
Content-Type
headerIn principle, Dapr considers a message successfully delivered once the subscriber processes the message and responds with a non-error response. For more granular control, Dapr’s pub/sub API also provides explicit statuses, defined in the response payload, with which the subscriber indicates specific handling instructions to Dapr (for example, RETRY
or DROP
).
Dapr applications can subscribe to published topics via three subscription types that support the same features: declarative, streaming and programmatic.
Subscription type | Description |
---|---|
Declarative | The subscription is defined in an external file. The declarative approach removes the Dapr dependency from your code and allows for existing applications to subscribe to topics, without having to change code. |
Streaming | The subscription is defined in the user code. Streaming subscriptions are dynamic, meaning they allow for adding or removing subscriptions at runtime. They do not require a subscription endpoint in your application (that is required by both programmatic and declarative subscriptions), making them easy to configure in code. Streaming subscriptions also do not require an app to be configured with the sidecar to receive messages. With streaming subscriptions, since messages are sent to a message handler code, there is no concept of routes or bulk subscriptions. |
Programmatic | Subscription is defined in the user code. The programmatic approach implements the static subscription and requires an endpoint in your code. |
For more information, read about the subscriptions in Subscription Types.
To reload topic subscriptions that are defined programmatically or declaratively, the Dapr sidecar needs to be restarted.
The Dapr sidecar can be made to dynamically reload changed declarative topic subscriptions without restarting by enabling the HotReload
feature gate.
Hot reloading of topic subscriptions is currently a preview feature.
In-flight messages are unaffected when reloading a subscription.
Dapr provides content-based routing pattern. Pub/sub routing is an implementation of this pattern that allows developers to use expressions to route CloudEvents based on their contents to different URIs/paths and event handlers in your application. If no route matches, an optional default route is used. This is useful as your applications expands to support multiple event versions or special cases.
This feature is available to both the declarative and programmatic subscription approaches.
For more information on message routing, read Dapr pub/sub API reference
Sometimes, messages can’t be processed because of a variety of possible issues, such as erroneous conditions within the producer or consumer application or an unexpected state change that causes an issue with your application code. Dapr allows developers to set dead letter topics to deal with messages that cannot be delivered to an application. This feature is available on all pub/sub components and prevents consumer applications from endlessly retrying a failed message. For more information, read about dead letter topics
Dapr enables developers to use the outbox pattern for achieving a single transaction across a transactional state store and any message broker. For more information, read How to enable transactional outbox messaging
Dapr solves multi-tenancy at-scale with namespaces for consumer groups. Simply include the "{namespace}"
value in your component metadata for consumer groups to allow multiple namespaces with applications of the same app-id
to publish and subscribe to the same message broker.
Dapr guarantees at-least-once semantics for message delivery. When an application publishes a message to a topic using the pub/sub API, Dapr ensures the message is delivered at least once to every subscriber.
Even if the message fails to deliver, or your application crashes, Dapr attempts to redeliver the message until successful delivery.
All Dapr pub/sub components support the at-least-once guarantee.
Dapr handles the burden of dealing with consumer groups and the competing consumers pattern. In the competing consumers pattern, multiple application instances using a single consumer group compete for the message. Dapr enforces the competing consumer pattern when replicas use the same app-id
without explicit consumer group overrides.
When multiple instances of the same application (with same app-id
) subscribe to a topic, Dapr delivers each message to only one instance of that application. This concept is illustrated in the diagram below.
Similarly, if two different applications (with different app-id
) subscribe to the same topic, Dapr delivers each message to only one instance of each application.
Not all Dapr pub/sub components support the competing consumer pattern. Currently, the following (non-exhaustive) pub/sub components support this:
By default, all topic messages associated with an instance of a pub/sub component are available to every application configured with that component. You can limit which application can publish or subscribe to topics with Dapr topic scoping. For more information, read: pub/sub topic scoping.
Dapr can set a timeout message on a per-message basis, meaning that if the message is not read from the pub/sub component, then the message is discarded. This timeout message prevents a build up of unread messages. If a message has been in the queue longer than the configured TTL, it is marked as dead. For more information, read pub/sub message TTL.
Dapr supports sending and receiving multiple messages in a single request. When writing applications that need to send or receive a large number of messages, using bulk operations allows achieving high throughput by reducing the overall number of requests. For more information, read pub/sub bulk messages.
When running on Kubernetes, subscribers can have a sticky consumerID
per instance when using StatefulSets in combination with the {podName}
marker. See how to horizontally scale subscribers with StatefulSets.
Want to put the Dapr pub/sub API to the test? Walk through the following quickstart and tutorials to see pub/sub in action:
Quickstart/tutorial | Description |
---|---|
Pub/sub quickstart | Send and receive messages using the publish and subscribe API. |
Pub/sub tutorial | Demonstrates how to use Dapr to enable pub-sub applications. Uses Redis as a pub-sub component. |
Want to skip the quickstarts? Not a problem. You can try out the pub/sub building block directly in your application to publish messages and subscribe to a topic. After Dapr is installed, you can begin using the pub/sub API starting with the pub/sub how-to guide.
Now that you’ve learned what the Dapr pub/sub building block provides, learn how it can work in your service. The below code example loosely describes an application that processes orders with two services, each with Dapr sidecars:
Dapr automatically wraps the user payload in a CloudEvents v1.0 compliant envelope, using Content-Type
header value for datacontenttype
attribute. Learn more about messages with CloudEvents.
The following example demonstrates how your applications publish and subscribe to a topic called orders
.
The first step is to set up the pub/sub component:
When you run dapr init
, Dapr creates a default Redis pubsub.yaml
and runs a Redis container on your local machine, located:
%UserProfile%\.dapr\components\pubsub.yaml
~/.dapr/components/pubsub.yaml
With the pubsub.yaml
component, you can easily swap out underlying components without application code changes. In this example, RabbitMQ is used.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: order-pub-sub
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: host
value: "amqp://localhost:5672"
- name: durable
value: "false"
- name: deletedWhenUnused
value: "false"
- name: autoAck
value: "false"
- name: reconnectWait
value: "0"
- name: concurrency
value: parallel
scopes:
- orderprocessing
- checkout
You can override this file with another pubsub component by creating a components directory (in this example, myComponents
) containing the file and using the flag --resources-path
with the dapr run
CLI command.
To deploy this into a Kubernetes cluster, fill in the metadata
connection details of the pub/sub component in the YAML below, save as pubsub.yaml
, and run kubectl apply -f pubsub.yaml
.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: order-pub-sub
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: connectionString
value: "amqp://localhost:5672"
- name: protocol
value: amqp
- name: hostname
value: localhost
- name: username
value: username
- name: password
value: password
- name: durable
value: "false"
- name: deletedWhenUnused
value: "false"
- name: autoAck
value: "false"
- name: reconnectWait
value: "0"
- name: concurrency
value: parallel
scopes:
- orderprocessing
- checkout
dapr run --app-id myapp --resources-path ./myComponents -- dotnet run
dapr run --app-id myapp --resources-path ./myComponents -- mvn spring-boot:run
dapr run --app-id myapp --resources-path ./myComponents -- python3 app.py
dapr run --app-id myapp --resources-path ./myComponents -- go run app.go
dapr run --app-id myapp --resources-path ./myComponents -- npm start
Dapr provides three methods by which you can subscribe to topics:
Learn more in the declarative, streaming, and programmatic subscriptions doc. This example demonstrates a declarative subscription.
Create a file named subscription.yaml
and paste the following:
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: order-pub-sub
spec:
topic: orders
routes:
default: /checkout
pubsubname: order-pub-sub
scopes:
- orderprocessing
- checkout
The example above shows an event subscription to topic orders
, for the pubsub component order-pub-sub
.
route
field tells Dapr to send all topic messages to the /checkout
endpoint in the app.scopes
field enables this subscription for apps with IDs orderprocessing
and checkout
.Place subscription.yaml
in the same directory as your pubsub.yaml
component. When Dapr starts up, it loads subscriptions along with the components.
HotReload
feature gate.
To prevent reprocessing or loss of unprocessed messages, in-flight messages between Dapr and your application are unaffected during hot reload events.
Below are code examples that leverage Dapr SDKs to subscribe to the topic you defined in subscription.yaml
.
using System.Collections.Generic;
using System.Threading.Tasks;
using System;
using Microsoft.AspNetCore.Mvc;
using Dapr;
using Dapr.Client;
namespace CheckoutService.Controllers;
[ApiController]
public sealed class CheckoutServiceController : ControllerBase
{
//Subscribe to a topic called "orders" from the "order-pub-sub" compoennt
[Topic("order-pub-sub", "orders")]
[HttpPost("checkout")]
public void GetCheckout([FromBody] int orderId)
{
Console.WriteLine("Subscriber received : " + orderId);
}
}
Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:
dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 --app-protocol https dotnet run
//dependencies
import io.dapr.Topic;
import io.dapr.client.domain.CloudEvent;
import org.springframework.web.bind.annotation.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
//code
@RestController
public class CheckoutServiceController {
private static final Logger log = LoggerFactory.getLogger(CheckoutServiceController.class);
//Subscribe to a topic
@Topic(name = "orders", pubsubName = "order-pub-sub")
@PostMapping(path = "/checkout")
public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromRunnable(() -> {
try {
log.info("Subscriber received: " + cloudEvent.getData());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:
dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 mvn spring-boot:run
#dependencies
from cloudevents.sdk.event import v1
from dapr.ext.grpc import App
import logging
import json
#code
app = App()
logging.basicConfig(level = logging.INFO)
#Subscribe to a topic
@app.subscribe(pubsub_name='order-pub-sub', topic='orders')
def mytopic(event: v1.Event) -> None:
data = json.loads(event.Data())
logging.info('Subscriber received: ' + str(data))
app.run(6002)
Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:
dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --app-protocol grpc -- python3 CheckoutService.py
//dependencies
import (
"log"
"net/http"
"context"
"github.com/dapr/go-sdk/service/common"
daprd "github.com/dapr/go-sdk/service/http"
)
//code
var sub = &common.Subscription{
PubsubName: "order-pub-sub",
Topic: "orders",
Route: "/checkout",
}
func main() {
s := daprd.NewService(":6002")
//Subscribe to a topic
if err := s.AddTopicEventHandler(sub, eventHandler); err != nil {
log.Fatalf("error adding topic subscription: %v", err)
}
if err := s.Start(); err != nil && err != http.ErrServerClosed {
log.Fatalf("error listenning: %v", err)
}
}
func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
log.Printf("Subscriber received: %s", e.Data)
return false, nil
}
Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:
dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 go run CheckoutService.go
//dependencies
import { DaprServer, CommunicationProtocolEnum } from '@dapr/dapr';
//code
const daprHost = "127.0.0.1";
const serverHost = "127.0.0.1";
const serverPort = "6002";
start().catch((e) => {
console.error(e);
process.exit(1);
});
async function start(orderId) {
const server = new DaprServer({
serverHost,
serverPort,
communicationProtocol: CommunicationProtocolEnum.HTTP,
clientOptions: {
daprHost,
daprPort: process.env.DAPR_HTTP_PORT,
},
});
//Subscribe to a topic
await server.pubsub.subscribe("order-pub-sub", "orders", async (orderId) => {
console.log(`Subscriber received: ${JSON.stringify(orderId)}`)
});
await server.start();
}
Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the subscriber application:
dapr run --app-id checkout --app-port 6002 --dapr-http-port 3602 --dapr-grpc-port 60002 npm start
Start an instance of Dapr with an app-id called orderprocessing
:
dapr run --app-id orderprocessing --dapr-http-port 3601
Then publish a message to the orders
topic:
dapr publish --publish-app-id orderprocessing --pubsub order-pub-sub --topic orders --data '{"orderId": "100"}'
curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders -H "Content-Type: application/json" -d '{"orderId": "100"}'
Invoke-RestMethod -Method Post -ContentType 'application/json' -Body '{"orderId": "100"}' -Uri 'http://localhost:3601/v1.0/publish/order-pub-sub/orders'
Below are code examples that leverage Dapr SDKs to publish a topic.
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Dapr.Client;
using System.Threading;
const string PUBSUB_NAME = "order-pub-sub";
const string TOPIC_NAME = "orders";
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDaprClient();
var app = builder.Build();
var random = new Random();
var client = app.Services.GetRequiredService<DaprClient>();
while(true) {
await Task.Delay(TimeSpan.FromSeconds(5));
var orderId = random.Next(1,1000);
var source = new CancellationTokenSource();
var cancellationToken = source.Token;
//Using Dapr SDK to publish a topic
await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, orderId, cancellationToken);
Console.WriteLine("Published data: " + orderId);
}
Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 --app-protocol https dotnet run
//dependencies
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.Metadata;
import static java.util.Collections.singletonMap;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import java.util.concurrent.TimeUnit;
//code
@SpringBootApplication
public class OrderProcessingServiceApplication {
private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);
public static void main(String[] args) throws InterruptedException{
String MESSAGE_TTL_IN_SECONDS = "1000";
String TOPIC_NAME = "orders";
String PUBSUB_NAME = "order-pub-sub";
while(true) {
TimeUnit.MILLISECONDS.sleep(5000);
Random random = new Random();
int orderId = random.nextInt(1000-1) + 1;
DaprClient client = new DaprClientBuilder().build();
//Using Dapr SDK to publish a topic
client.publishEvent(
PUBSUB_NAME,
TOPIC_NAME,
orderId,
singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
log.info("Published data:" + orderId);
}
}
}
Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 mvn spring-boot:run
#dependencies
import random
from time import sleep
import requests
import logging
import json
from dapr.clients import DaprClient
#code
logging.basicConfig(level = logging.INFO)
while True:
sleep(random.randrange(50, 5000) / 1000)
orderId = random.randint(1, 1000)
PUBSUB_NAME = 'order-pub-sub'
TOPIC_NAME = 'orders'
with DaprClient() as client:
#Using Dapr SDK to publish a topic
result = client.publish_event(
pubsub_name=PUBSUB_NAME,
topic_name=TOPIC_NAME,
data=json.dumps(orderId),
data_content_type='application/json',
)
logging.info('Published data: ' + str(orderId))
Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --app-protocol grpc python3 OrderProcessingService.py
//dependencies
import (
"context"
"log"
"math/rand"
"time"
"strconv"
dapr "github.com/dapr/go-sdk/client"
)
//code
var (
PUBSUB_NAME = "order-pub-sub"
TOPIC_NAME = "orders"
)
func main() {
for i := 0; i < 10; i++ {
time.Sleep(5000)
orderId := rand.Intn(1000-1) + 1
client, err := dapr.NewClient()
if err != nil {
panic(err)
}
defer client.Close()
ctx := context.Background()
//Using Dapr SDK to publish a topic
if err := client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId)));
err != nil {
panic(err)
}
log.Println("Published data: " + strconv.Itoa(orderId))
}
}
Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 go run OrderProcessingService.go
//dependencies
import { DaprServer, DaprClient, CommunicationProtocolEnum } from '@dapr/dapr';
const daprHost = "127.0.0.1";
var main = function() {
for(var i=0;i<10;i++) {
sleep(5000);
var orderId = Math.floor(Math.random() * (1000 - 1) + 1);
start(orderId).catch((e) => {
console.error(e);
process.exit(1);
});
}
}
async function start(orderId) {
const PUBSUB_NAME = "order-pub-sub"
const TOPIC_NAME = "orders"
const client = new DaprClient({
daprHost,
daprPort: process.env.DAPR_HTTP_PORT,
communicationProtocol: CommunicationProtocolEnum.HTTP
});
console.log("Published data:" + orderId)
//Using Dapr SDK to publish a topic
await client.pubsub.publish(PUBSUB_NAME, TOPIC_NAME, orderId);
}
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
main();
Navigate to the directory containing the above code, then run the following command to launch both a Dapr sidecar and the publisher application:
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 npm start
In order to tell Dapr that a message was processed successfully, return a 200 OK
response. If Dapr receives any other return status code than 200
, or if your app crashes, Dapr will attempt to redeliver the message following at-least-once semantics.
Watch this demo video to learn more about pub/sub messaging with Dapr.
To enable message routing and provide additional context with each message, Dapr uses the CloudEvents 1.0 specification as its message format. Any message sent by an application to a topic using Dapr is automatically wrapped in a CloudEvents envelope, using the Content-Type
header value for datacontenttype
attribute.
Dapr uses CloudEvents to provide additional context to the event payload, enabling features like:
You can choose any of three methods for publish a CloudEvent via pub/sub:
Sending a publish operation to Dapr automatically wraps it in a CloudEvent envelope containing the following fields:
id
source
specversion
type
traceparent
traceid
tracestate
topic
pubsubname
time
datacontenttype
(optional)The following example demonstrates a CloudEvent generated by Dapr for a publish operation to the orders
topic that includes:
traceid
unique to the messagedata
and the fields for the CloudEvent where the data content is serialized as JSON{
"topic": "orders",
"pubsubname": "order_pub_sub",
"traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
"tracestate": "",
"data": {
"orderId": 1
},
"id": "5929aaac-a5e2-4ca1-859c-edfe73f11565",
"specversion": "1.0",
"datacontenttype": "application/json; charset=utf-8",
"source": "checkout",
"type": "com.dapr.event.sent",
"time": "2020-09-23T06:23:21Z",
"traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
}
As another example of a v1.0 CloudEvent, the following shows data as XML content in a CloudEvent message serialized as JSON:
{
"topic": "orders",
"pubsubname": "order_pub_sub",
"traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
"tracestate": "",
"data" : "<note><to></to><from>user2</from><message>Order</message></note>",
"id" : "id-1234-5678-9101",
"specversion" : "1.0",
"datacontenttype" : "text/xml",
"subject" : "Test XML Message",
"source" : "https://example.com/message",
"type" : "xml.message",
"time" : "2020-09-23T06:23:21Z"
}
Dapr automatically generates several CloudEvent properties. You can replace these generated CloudEvent properties by providing the following optional metadata key/value:
cloudevent.id
: overrides id
cloudevent.source
: overrides source
cloudevent.type
: overrides type
cloudevent.traceid
: overrides traceid
cloudevent.tracestate
: overrides tracestate
cloudevent.traceparent
: overrides traceparent
The ability to replace CloudEvents properties using these metadata properties applies to all pub/sub components.
For example, to replace the source
and id
values from the CloudEvent example above in code:
with DaprClient() as client:
order = {'orderId': i}
# Publish an event/message using Dapr PubSub
result = client.publish_event(
pubsub_name='order_pub_sub',
topic_name='orders',
publish_metadata={'cloudevent.id': 'd99b228f-6c73-4e78-8c4d-3f80a043d317', 'cloudevent.source': 'payment'}
)
# or
cloud_event = {
'specversion': '1.0',
'type': 'com.example.event',
'source': 'payment',
'id': 'd99b228f-6c73-4e78-8c4d-3f80a043d317',
'data': {'orderId': i},
'datacontenttype': 'application/json',
...
}
# Set the data content type to 'application/cloudevents+json'
result = client.publish_event(
pubsub_name='order_pub_sub',
topic_name='orders',
data=json.dumps(cloud_event),
data_content_type='application/cloudevents+json',
)
var order = new Order(i);
using var client = new DaprClientBuilder().Build();
// Override cloudevent metadata
var metadata = new Dictionary<string,string>() {
{ "cloudevent.source", "payment" },
{ "cloudevent.id", "d99b228f-6c73-4e78-8c4d-3f80a043d317" }
}
// Publish an event/message using Dapr PubSub
await client.PublishEventAsync("order_pub_sub", "orders", order, metadata);
Console.WriteLine("Published data: " + order);
await Task.Delay(TimeSpan.FromSeconds(1));
The JSON payload then reflects the new source
and id
values:
{
"topic": "orders",
"pubsubname": "order_pub_sub",
"traceid": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01",
"tracestate": "",
"data": {
"orderId": 1
},
"id": "d99b228f-6c73-4e78-8c4d-3f80a043d317",
"specversion": "1.0",
"datacontenttype": "application/json; charset=utf-8",
"source": "payment",
"type": "com.dapr.event.sent",
"time": "2020-09-23T06:23:21Z",
"traceparent": "00-113ad9c4e42b27583ae98ba698d54255-e3743e35ff56f219-01"
}
traceid
/traceparent
and tracestate
, doing this may interfere with tracing events and report inconsistent results in tracing tools. It’s recommended to use Open Telemetry for distributed traces. Learn more about distributed tracing.
If you want to use your own CloudEvent, make sure to specify the datacontenttype
as application/cloudevents+json
.
If the CloudEvent that was authored by the app does not contain the minimum required fields in the CloudEvent specification, the message is rejected. Dapr adds the following fields to the CloudEvent if they are missing:
time
traceid
traceparent
tracestate
topic
pubsubname
source
type
specversion
You can add additional fields to a custom CloudEvent that are not part of the official CloudEvent specification. Dapr will pass these fields as-is.
Publish a CloudEvent to the orders
topic:
dapr publish --publish-app-id orderprocessing --pubsub order-pub-sub --topic orders --data '{\"orderId\": \"100\"}'
Publish a CloudEvent to the orders
topic:
curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders -H "Content-Type: application/cloudevents+json" -d '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}'
Publish a CloudEvent to the orders
topic:
Invoke-RestMethod -Method Post -ContentType 'application/cloudevents+json' -Body '{"specversion" : "1.0", "type" : "com.dapr.cloudevent.sent", "source" : "testcloudeventspubsub", "subject" : "Cloud Events Test", "id" : "someCloudEventId", "time" : "2021-08-02T09:00:00Z", "datacontenttype" : "application/cloudevents+json", "data" : {"orderId": "100"}}' -Uri 'http://localhost:3601/v1.0/publish/order-pub-sub/orders'
When using cloud events created by Dapr, the envelope contains an id
field which can be used by the app to perform message deduplication. Dapr does not handle deduplication automatically. Dapr supports using message brokers that natively enable message deduplication.
When adding Dapr to your application, some services may still need to communicate via pub/sub messages not encapsulated in CloudEvents, due to either compatibility reasons or some apps not using Dapr. These are referred to as “raw” pub/sub messages. Dapr enables apps to publish and subscribe to raw events not wrapped in a CloudEvent for compatibility and to send data that is not JSON serializable.
Dapr apps are able to publish raw events to pub/sub topics without CloudEvent encapsulation, for compatibility with non-Dapr apps.
To disable CloudEvent wrapping, set the rawPayload
metadata to true
as part of the publishing request. This allows subscribers to receive these messages without having to parse the CloudEvent schema.
curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/TOPIC_A?metadata.rawPayload=true -H "Content-Type: application/json" -d '{"order-number": "345"}'
using Dapr.Client;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers().AddDapr();
var app = builder.Build();
app.MapPost("/publish", async (DaprClient daprClient) =>
{
var message = new Message(
Guid.NewGuid().ToString(),
$"Hello at {DateTime.UtcNow}",
DateTime.UtcNow
);
await daprClient.PublishEventAsync(
"pubsub", // pubsub name
"messages", // topic name
message, // message data
new Dictionary<string, string>
{
{ "rawPayload", "true" },
{ "content-type", "application/json" }
}
);
return Results.Ok(message);
});
app.Run();
from dapr.clients import DaprClient
with DaprClient() as d:
req_data = {
'order-number': '345'
}
# Create a typed message with content type and body
resp = d.publish_event(
pubsub_name='pubsub',
topic_name='TOPIC_A',
data=json.dumps(req_data),
publish_metadata={'rawPayload': 'true'}
)
# Print the request
print(req_data, flush=True)
<?php
require_once __DIR__.'/vendor/autoload.php';
$app = \Dapr\App::create();
$app->run(function(\DI\FactoryInterface $factory) {
$publisher = $factory->make(\Dapr\PubSub\Publish::class, ['pubsub' => 'pubsub']);
$publisher->topic('TOPIC_A')->publish('data', ['rawPayload' => 'true']);
});
Dapr apps can subscribe to raw messages from pub/sub topics, even if they weren’t published as CloudEvents. However, the subscribing Dapr process still wraps these raw messages in a CloudEvent before delivering them to the subscribing application.
When subscribing programmatically, add the additional metadata entry for rawPayload
to allow the subscriber to receive a message that is not wrapped by a CloudEvent. For .NET, this metadata entry is called isRawPayload
.
When using raw payloads the message is always base64 encoded with content type application/octet-stream
.
using System.Text.Json;
using System.Text.Json.Serialization;
var builder = WebApplication.CreateBuilder(args);
var app = builder.Build();
app.MapGet("/dapr/subscribe", () =>
{
var subscriptions = new[]
{
new
{
pubsubname = "pubsub",
topic = "messages",
route = "/messages",
metadata = new Dictionary<string, string>
{
{ "isRawPayload", "true" },
{ "content-type", "application/json" }
}
}
};
return Results.Ok(subscriptions);
});
app.MapPost("/messages", async (HttpContext context) =>
{
using var reader = new StreamReader(context.Request.Body);
var json = await reader.ReadToEndAsync();
Console.WriteLine($"Raw message received: {json}");
return Results.Ok();
});
app.Run();
import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys
app = flask.Flask(__name__)
CORS(app)
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
subscriptions = [{'pubsubname': 'pubsub',
'topic': 'deathStarStatus',
'route': 'dsstatus',
'metadata': {
'rawPayload': 'true',
} }]
return jsonify(subscriptions)
@app.route('/dsstatus', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
<?php
require_once __DIR__.'/vendor/autoload.php';
$app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'deathStarStatus', route: '/dsstatus', metadata: [ 'rawPayload' => 'true'] ),
]]));
$app->post('/dsstatus', function(
#[\Dapr\Attributes\FromBody]
\Dapr\PubSub\CloudEvent $cloudEvent,
\Psr\Log\LoggerInterface $logger
) {
$logger->alert('Received event: {event}', ['event' => $cloudEvent]);
return ['status' => 'SUCCESS'];
}
);
$app->start();
Similarly, you can subscribe to raw events declaratively by adding the rawPayload
metadata entry to your subscription specification.
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: myevent-subscription
spec:
topic: deathStarStatus
routes:
default: /dsstatus
pubsubname: pubsub
metadata:
isRawPayload: "true"
scopes:
- app1
- app2
Pub/sub routing is an implementation of content-based routing, a messaging pattern that utilizes a DSL instead of imperative application code. With pub/sub routing, you use expressions to route CloudEvents (based on their contents) to different URIs/paths and event handlers in your application. If no route matches, then an optional default route is used. This proves useful as your applications expand to support multiple event versions or special cases.
While routing can be implemented with code, keeping routing rules external from the application can improve portability.
This feature is available to both the declarative and programmatic subscription approaches, however does not apply to streaming subscriptions.
For declarative subscriptions, use dapr.io/v2alpha1
as the apiVersion
. Here is an example of subscriptions.yaml
using routing:
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: myevent-subscription
spec:
pubsubname: pubsub
topic: inventory
routes:
rules:
- match: event.type == "widget"
path: /widgets
- match: event.type == "gadget"
path: /gadgets
default: /products
scopes:
- app1
- app2
In the programmatic approach, the routes
structure is returned instead of route
. The JSON structure matches the declarative YAML:
import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys
app = flask.Flask(__name__)
CORS(app)
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
subscriptions = [
{
'pubsubname': 'pubsub',
'topic': 'inventory',
'routes': {
'rules': [
{
'match': 'event.type == "widget"',
'path': '/widgets'
},
{
'match': 'event.type == "gadget"',
'path': '/gadgets'
},
],
'default': '/products'
}
}]
return jsonify(subscriptions)
@app.route('/products', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));
const port = 3000
app.get('/dapr/subscribe', (req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "inventory",
routes: {
rules: [
{
match: 'event.type == "widget"',
path: '/widgets'
},
{
match: 'event.type == "gadget"',
path: '/gadgets'
},
],
default: '/products'
}
}
]);
})
app.post('/products', (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
[Topic("pubsub", "inventory", "event.type ==\"widget\"", 1)]
[HttpPost("widgets")]
public async Task<ActionResult<Stock>> HandleWidget(Widget widget, [FromServices] DaprClient daprClient)
{
// Logic
return stock;
}
[Topic("pubsub", "inventory", "event.type ==\"gadget\"", 2)]
[HttpPost("gadgets")]
public async Task<ActionResult<Stock>> HandleGadget(Gadget gadget, [FromServices] DaprClient daprClient)
{
// Logic
return stock;
}
[Topic("pubsub", "inventory")]
[HttpPost("products")]
public async Task<ActionResult<Stock>> HandleProduct(Product product, [FromServices] DaprClient daprClient)
{
// Logic
return stock;
}
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/gorilla/mux"
)
const appPort = 3000
type subscription struct {
PubsubName string `json:"pubsubname"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata,omitempty"`
Routes routes `json:"routes"`
}
type routes struct {
Rules []rule `json:"rules,omitempty"`
Default string `json:"default,omitempty"`
}
type rule struct {
Match string `json:"match"`
Path string `json:"path"`
}
// This handles /dapr/subscribe
func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
t := []subscription{
{
PubsubName: "pubsub",
Topic: "inventory",
Routes: routes{
Rules: []rule{
{
Match: `event.type == "widget"`,
Path: "/widgets",
},
{
Match: `event.type == "gadget"`,
Path: "/gadgets",
},
},
Default: "/products",
},
},
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(t)
}
func main() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/dapr/subscribe", configureSubscribeHandler).Methods("GET")
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", appPort), router))
}
<?php
require_once __DIR__.'/vendor/autoload.php';
$app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'inventory', routes: (
rules: => [
('match': 'event.type == "widget"', path: '/widgets'),
('match': 'event.type == "gadget"', path: '/gadgets'),
]
default: '/products')),
]]));
$app->post('/products', function(
#[\Dapr\Attributes\FromBody]
\Dapr\PubSub\CloudEvent $cloudEvent,
\Psr\Log\LoggerInterface $logger
) {
$logger->alert('Received event: {event}', ['event' => $cloudEvent]);
return ['status' => 'SUCCESS'];
}
);
$app->start();
In these examples, depending on the event.type
, the application will be called on:
/widgets
/gadgets
/products
The expressions are written as Common Expression Language (CEL) where event
represents the cloud event. Any of the attributes from the CloudEvents core specification can be referenced in the expression.
Match “important” messages:
has(event.data.important) && event.data.important == true
Match deposits greater than $10,000:
event.type == "deposit" && int(event.data.amount) > 10000
event.data.amount
is not cast as integer, the match is not performed. For more information, see the CEL documentation.
Match multiple versions of a message:
event.type == "mymessage.v1"
event.type == "mymessage.v2"
For reference, the following attributes are from the CloudEvents specification.
As defined by the term data, CloudEvents may include domain-specific information about the occurrence. When present, this information will be encapsulated within data
.
datacontenttype
attribute (e.g. application/json), and adheres to the dataschema
format when those respective attributes are present.The following attributes are required in all CloudEvents:
String
source
+ id
are unique for each distinct event. If a duplicate event is re-sent (e.g. due
to a network error), it may have the same id
. Consumers may assume that
events with identical source
and id
are duplicates.Type: URI-reference
Description: Identifies the context in which an event happened. Often this includes information such as:
The exact syntax and semantics behind the data encoded in the URI is defined by the event producer.
Producers must ensure that source
+ id
are unique for each distinct event.
An application may:
source
to each distinct producer, making it easier to produce unique IDs and preventing other producers from having the same source
.source
identifiers.A source may include more than one producer. In this case, the producers must collaborate to ensure that source
+ id
are unique for each distinct event.
Constraints:
Examples:
Type: String
Description: The version of the CloudEvents specification used by the event. This enables the interpretation of the context. Compliant event producers must use a value of 1.0
when referring to this version of the specification.
Currently, this attribute only includes the ‘major’ and ‘minor’ version numbers. This allows patch changes to the specification to be made without changing this property’s value in the serialization.
Note: for ‘release candidate’ releases, a suffix might be used for testing purposes.
Constraints:
String
type
. See Versioning of CloudEvents in the Primer for more information.The following attributes are optional to appear in CloudEvents. See the Notational Conventions section for more information on the definition of OPTIONAL.
Type: String
per RFC 2046
Description: Content type of data
value. This attribute enables data
to carry any type of content, whereby format and encoding might differ from that of the chosen event format.
For example, an event rendered using the JSON envelope format might carry an XML payload in data
. The consumer is informed by this attribute being set to "application/xml"
.
The rules for how data
content is rendered for different datacontenttype
values are defined in the event format specifications. For example, the JSON event format defines the relationship in section 3.1.
For some binary mode protocol bindings, this field is directly mapped to the respective protocol’s content-type metadata property. You can find normative rules for the binary mode and the content-type metadata mapping in the respective protocol.
In some event formats, you may omit the datacontenttype
attribute. For example, if a JSON format event has no datacontenttype
attribute, it’s implied that the data
is a JSON value conforming to the "application/json"
media type. In other words: a JSON-format event with no datacontenttype
is exactly equivalent to one with datacontenttype="application/json"
.
When translating an event message with no datacontenttype
attribute to a different format or protocol binding, the target datacontenttype
should be set explicitly to the implied datacontenttype
of the source.
Constraints:
For Media Type examples, see IANA Media Types
URI
data
adheres to. Incompatible changes to the schema should be reflected by a different URI. See Versioning of CloudEvents in the Primer for more information.Type: String
Description: This describes the event subject in the context of the event producer (identified by source
). In publish-subscribe scenarios, a subscriber will typically subscribe to events emitted by a source
. The source
identifier alone might not be sufficient as a qualifier for any specific event if the source
context has internal sub-structure.
Identifying the subject of the event in context metadata (opposed to only in the data
payload) is helpful in generic subscription filtering scenarios, where middleware is unable to interpret the data
content. In the above example, the subscriber might only be interested in blobs with names ending with ‘.jpg’ or ‘.jpeg’. With the subject
attribute, you can construct a simple and efficient string-suffix filter for that subset of events.
Constraints:
Example:
A subscriber might register interest for when new blobs are created inside a blob-storage container. In this case:
source
identifies the subscription scope (storage container)type
identifies the “blob created” eventid
uniquely identifies the event instance to distinguish separately created occurrences of a same-named blob.The name of the newly created blob is carried in subject
:
source
: https://example.com/storage/tenant/containersubject
: mynewfile.jpgTimestamp
source
must be consistent in this respect. In other words, either they all use the actual time of the occurrence or they all use the same algorithm to determine the value used.Watch this video on how to use message routing with pub/sub:
Dapr applications can subscribe to published topics via three subscription types that support the same features: declarative, streaming and programmatic.
Subscription type | Description |
---|---|
Declarative | Subscription is defined in an external file. The declarative approach removes the Dapr dependency from your code and allows for existing applications to subscribe to topics, without having to change code. |
Streaming | Subscription is defined in the application code. Streaming subscriptions are dynamic, meaning they allow for adding or removing subscriptions at runtime. They do not require a subscription endpoint in your application (that is required by both programmatic and declarative subscriptions), making them easy to configure in code. Streaming subscriptions also do not require an app to be configured with the sidecar to receive messages. |
Programmatic | Subscription is defined in the application code. The programmatic approach implements the static subscription and requires an endpoint in your code. |
The examples below demonstrate pub/sub messaging between a checkout
app and an orderprocessing
app via the orders
topic. The examples demonstrate the same Dapr pub/sub component used first declaratively, then programmatically.
HotReload
feature gate.
To prevent reprocessing or loss of unprocessed messages, in-flight messages between Dapr and your application are unaffected during hot reload events.
You can subscribe declaratively to a topic using an external component file. This example uses a YAML component file named subscription.yaml
:
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: order
spec:
topic: orders
routes:
default: /orders
pubsubname: pubsub
scopes:
- orderprocessing
Here the subscription called order
:
pubsub
to subscribes to the topic called orders
.route
field to send all topic messages to the /orders
endpoint in the app.scopes
field to scope this subscription for access only by apps with ID orderprocessing
.When running Dapr, set the YAML component file path to point Dapr to the component.
dapr run --app-id myapp --resources-path ./myComponents -- dotnet run
dapr run --app-id myapp --resources-path ./myComponents -- mvn spring-boot:run
dapr run --app-id myapp --resources-path ./myComponents -- python3 app.py
dapr run --app-id myapp --resources-path ./myComponents -- npm start
dapr run --app-id myapp --resources-path ./myComponents -- go run app.go
In Kubernetes, apply the component to the cluster:
kubectl apply -f subscription.yaml
In your application code, subscribe to the topic specified in the Dapr pub/sub component.
//Subscribe to a topic
[HttpPost("orders")]
public void getCheckout([FromBody] int orderId)
{
Console.WriteLine("Subscriber received : " + orderId);
}
import io.dapr.client.domain.CloudEvent;
//Subscribe to a topic
@PostMapping(path = "/orders")
public Mono<Void> getCheckout(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromRunnable(() -> {
try {
log.info("Subscriber received: " + cloudEvent.getData());
}
});
}
from cloudevents.sdk.event import v1
#Subscribe to a topic
@app.route('/orders', methods=['POST'])
def checkout(event: v1.Event) -> None:
data = json.loads(event.Data())
logging.info('Subscriber received: ' + str(data))
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));
// listen to the declarative route
app.post('/orders', (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
//Subscribe to a topic
var sub = &common.Subscription{
PubsubName: "pubsub",
Topic: "orders",
Route: "/orders",
}
func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
log.Printf("Subscriber received: %s", e.Data)
return false, nil
}
The /orders
endpoint matches the route
defined in the subscriptions and this is where Dapr sends all topic messages to.
Streaming subscriptions are subscriptions defined in application code that can be dynamically stopped and started at runtime. Messages are pulled by the application from Dapr. This means no endpoint is needed to subscribe to a topic, and it’s possible to subscribe without any app configured on the sidecar at all. Any number of pubsubs and topics can be subscribed to at once. As messages are sent to the given message handler code, there is no concept of routes or bulk subscriptions.
Note: Only a single pubsub/topic pair per application may be subscribed at a time.
The example below shows the different ways to stream subscribe to a topic.
You can use the SubscribeAsync
method on the DaprPublishSubscribeClient
to configure the message handler to use to pull messages from the stream.
using System.Text;
using Dapr.Messaging.PublishSubscribe;
using Dapr.Messaging.PublishSubscribe.Extensions;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddDaprPubSubClient();
var app = builder.Build();
var messagingClient = app.Services.GetRequiredService<DaprPublishSubscribeClient>();
//Create a dynamic streaming subscription and subscribe with a timeout of 30 seconds and 10 seconds for message handling
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var subscription = await messagingClient.SubscribeAsync("pubsub", "myTopic",
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(10), TopicResponseAction.Retry)),
HandleMessageAsync, cancellationTokenSource.Token);
await Task.Delay(TimeSpan.FromMinutes(1));
//When you're done with the subscription, simply dispose of it
await subscription.DisposeAsync();
return;
//Process each message returned from the subscription
Task<TopicResponseAction> HandleMessageAsync(TopicMessage message, CancellationToken cancellationToken = default)
{
try
{
//Do something with the message
Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
return Task.FromResult(TopicResponseAction.Success);
}
catch
{
return Task.FromResult(TopicResponseAction.Retry);
}
}
Learn more about streaming subscriptions using the .NET SDK client.
You can use the subscribe
method, which returns a Subscription
object and allows you to pull messages from the stream by calling the next_message
method. This runs in and may block the main thread while waiting for messages.
import time
from dapr.clients import DaprClient
from dapr.clients.grpc.subscription import StreamInactiveError
counter = 0
def process_message(message):
global counter
counter += 1
# Process the message here
print(f'Processing message: {message.data()} from {message.topic()}...')
return 'success'
def main():
with DaprClient() as client:
global counter
subscription = client.subscribe(
pubsub_name='pubsub', topic='orders', dead_letter_topic='orders_dead'
)
try:
while counter < 5:
try:
message = subscription.next_message()
except StreamInactiveError as e:
print('Stream is inactive. Retrying...')
time.sleep(1)
continue
if message is None:
print('No message received within timeout period.')
continue
# Process the message
response_status = process_message(message)
if response_status == 'success':
subscription.respond_success(message)
elif response_status == 'retry':
subscription.respond_retry(message)
elif response_status == 'drop':
subscription.respond_drop(message)
finally:
print("Closing subscription...")
subscription.close()
if __name__ == '__main__':
main()
You can also use the subscribe_with_handler
method, which accepts a callback function executed for each message received from the stream. This runs in a separate thread, so it doesn’t block the main thread.
import time
from dapr.clients import DaprClient
from dapr.clients.grpc._response import TopicEventResponse
counter = 0
def process_message(message):
# Process the message here
global counter
counter += 1
print(f'Processing message: {message.data()} from {message.topic()}...')
return TopicEventResponse('success')
def main():
with (DaprClient() as client):
# This will start a new thread that will listen for messages
# and process them in the `process_message` function
close_fn = client.subscribe_with_handler(
pubsub_name='pubsub', topic='orders', handler_fn=process_message,
dead_letter_topic='orders_dead'
)
while counter < 5:
time.sleep(1)
print("Closing subscription...")
close_fn()
if __name__ == '__main__':
main()
Learn more about streaming subscriptions using the Python SDK client.
package main
import (
"context"
"log"
"github.com/dapr/go-sdk/client"
)
func main() {
cl, err := client.NewClient()
if err != nil {
log.Fatal(err)
}
sub, err := cl.Subscribe(context.Background(), client.SubscriptionOptions{
PubsubName: "pubsub",
Topic: "orders",
})
if err != nil {
panic(err)
}
// Close must always be called.
defer sub.Close()
for {
msg, err := sub.Receive()
if err != nil {
panic(err)
}
// Process the event
// We _MUST_ always signal the result of processing the message, else the
// message will not be considered as processed and will be redelivered or
// dead lettered.
// msg.Retry()
// msg.Drop()
if err := msg.Success(); err != nil {
panic(err)
}
}
}
or
package main
import (
"context"
"log"
"github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
)
func main() {
cl, err := client.NewClient()
if err != nil {
log.Fatal(err)
}
stop, err := cl.SubscribeWithHandler(context.Background(),
client.SubscriptionOptions{
PubsubName: "pubsub",
Topic: "orders",
},
eventHandler,
)
if err != nil {
panic(err)
}
// Stop must always be called.
defer stop()
<-make(chan struct{})
}
func eventHandler(e *common.TopicEvent) common.SubscriptionResponseStatus {
// Process message here
// common.SubscriptionResponseStatusRetry
// common.SubscriptionResponseStatusDrop
common.SubscriptionResponseStatusDrop, status)
}
return common.SubscriptionResponseStatusSuccess
}
Watch this video for an overview on streaming subscriptions:
The dynamic programmatic approach returns the routes
JSON structure within the code, unlike the declarative approach’s route
YAML structure.
Note: Programmatic subscriptions are only read once during application start-up. You cannot dynamically add new programmatic subscriptions, only at new ones at compile time.
In the example below, you define the values found in the declarative YAML subscription above within the application code.
[Topic("pubsub", "orders")]
[HttpPost("/orders")]
public async Task<ActionResult<Order>>Checkout(Order order, [FromServices] DaprClient daprClient)
{
// Logic
return order;
}
or
// Dapr subscription in [Topic] routes orders topic to this route
app.MapPost("/orders", [Topic("pubsub", "orders")] (Order order) => {
Console.WriteLine("Subscriber received : " + order);
return Results.Ok(order);
});
Both of the handlers defined above also need to be mapped to configure the dapr/subscribe
endpoint. This is done in the application startup code while defining endpoints.
app.UseEndpoints(endpoints =>
{
endpoints.MapSubscribeHandler();
});
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Topic(name = "orders", pubsubName = "pubsub")
@PostMapping(path = "/orders")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
return Mono.fromRunnable(() -> {
try {
System.out.println("Subscriber received: " + cloudEvent.getData());
System.out.println("Subscriber received: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
subscriptions = [
{
'pubsubname': 'pubsub',
'topic': 'orders',
'routes': {
'rules': [
{
'match': 'event.type == "order"',
'path': '/orders'
},
],
'default': '/orders'
}
}]
return jsonify(subscriptions)
@app.route('/orders', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));
const port = 3000
app.get('/dapr/subscribe', (req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "orders",
routes: {
rules: [
{
match: 'event.type == "order"',
path: '/orders'
},
],
default: '/products'
}
}
]);
})
app.post('/orders', (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/gorilla/mux"
)
const appPort = 3000
type subscription struct {
PubsubName string `json:"pubsubname"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata,omitempty"`
Routes routes `json:"routes"`
}
type routes struct {
Rules []rule `json:"rules,omitempty"`
Default string `json:"default,omitempty"`
}
type rule struct {
Match string `json:"match"`
Path string `json:"path"`
}
// This handles /dapr/subscribe
func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
t := []subscription{
{
PubsubName: "pubsub",
Topic: "orders",
Routes: routes{
Rules: []rule{
{
Match: `event.type == "order"`,
Path: "/orders",
},
},
Default: "/orders",
},
},
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(t)
}
func main() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/dapr/subscribe", configureSubscribeHandler).Methods("GET")
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", appPort), router))
}
There are times when applications might not be able to handle messages for a variety of reasons. For example, there could be transient issues retrieving data needed to process a message or the app business logic fails returning an error. Dead letter topics are used to forward messages that cannot be delivered to a subscribing app. This eases the pressure on app by freeing them from dealing with these failed messages, allowing developers to write code that reads from the dead letter topic and either fixes the message and resends this, or abandons it completely.
Dead letter topics are typically used in along with a retry resiliency policy and a dead letter subscription that handles the required logic for dealing with the messages forwarded from the dead letter topic.
When a dead letter topic is set, any message that failed to be delivered to an app for a configured topic is put on the dead letter topic to be forwarded to a subscription that handles these messages. This could be the same app or a completely different one.
Dapr enables dead letter topics for all of it’s pub/sub components, even if the underlying system does not support this feature natively. For example the AWS SNS Component has a dead letter queue and RabbitMQ has the dead letter topics. You will need to ensure that you configure components like this appropriately.
The diagram below is an example of how dead letter topics work. First a message is sent from a publisher on an orders
topic. Dapr receives the message on behalf of a subscriber application, however the orders topic message fails to be delivered to the /checkout
endpoint on the application, even after retries. As a result of the failure to deliver, the message is forwarded to the poisonMessages
topic which delivers this to the /failedMessages
endpoint to be processed, in this case on the same application. The failedMessages
processing code could drop the message or resend a new message.
The following YAML shows how to configure a subscription with a dead letter topic named poisonMessages
for messages consumed from the orders
topic. This subscription is scoped to an app with a checkout
ID.
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: order
spec:
topic: orders
routes:
default: /checkout
pubsubname: pubsub
deadLetterTopic: poisonMessages
scopes:
- checkout
var deadLetterTopic = "poisonMessages"
sub, err := cl.Subscribe(context.Background(), client.SubscriptionOptions{
PubsubName: "pubsub",
Topic: "orders",
DeadLetterTopic: &deadLetterTopic,
})
The JSON returned from the /subscribe
endpoint shows how to configure a dead letter topic named poisonMessages
for messages consumed from the orders
topic.
app.get('/dapr/subscribe', (_req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "orders",
route: "/checkout",
deadLetterTopic: "poisonMessages"
}
]);
});
By default, when a dead letter topic is set, any failing message immediately goes to the dead letter topic. As a result it is recommend to always have a retry policy set when using dead letter topics in a subscription. To enable the retry of a message before sending it to the dead letter topic, apply a retry resiliency policy to the pub/sub component.
This example shows how to set a constant retry policy named pubsubRetry
, with 10 maximum delivery attempts applied every 5 seconds for the pubsub
pub/sub component.
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
name: myresiliency
spec:
policies:
retries:
pubsubRetry:
policy: constant
duration: 5s
maxRetries: 10
targets:
components:
pubsub:
inbound:
retry: pubsubRetry
Remember to now configure a subscription to handling the dead letter topics. For example you can create another declarative subscription to receive these on the same or a different application. The example below shows the checkout application subscribing to the poisonMessages
topic with another subscription and sending these to be handled by the /failedmessages
endpoint.
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: deadlettertopics
spec:
topic: poisonMessages
routes:
rules:
- match:
path: /failedMessages
pubsubname: pubsub
scopes:
- checkout
Watch this video for an overview of the dead letter topics:
You’ve set up Dapr’s pub/sub API building block, and your applications are publishing and subscribing to topics smoothly, using a centralized message broker. What if you’d like to perform simple A/B testing, blue/green deployments, or even canary deployments for your applications? Even with using Dapr, this can prove difficult.
Dapr solves multi-tenancy at-scale with its pub/sub namespace consumer groups construct.
Let’s say you have a Kubernetes cluster, with two applications (App1 and App2) deployed to the same namespace (namespace-a). App2 publishes to a topic called order
, while App1 subscribes to the topic called order
. This will create two consumer groups, named after your applications (App1 and App2).
In order to perform simple testing and deployments while using a centralized message broker, you create another namespace with two applications of the same app-id
, App1 and App2.
Dapr creates consumer groups using the app-id
of individual applications, so the consumer group names will remain App1 and App2.
To avoid this, you’d then need to have something “creep” into your code to change the app-id
, depending on the namespace on which you’re running. This workaround is cumbersome and a significant painpoint.
Not only can Dapr allow you to change the behavior of a consumer group with a consumerID for your UUID and pod names, Dapr also provides a namespace construct that lives in the pub/sub component metadata. For example, using Redis as your message broker:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
- name: consumerID
value: "{namespace}"
By configuring consumerID
with the {namespace}
value, you’ll be able to use the same app-id
with the same topics from different namespaces.
In the diagram above, you have two namespaces, each with applications of the same app-id
, publishing and subscribing to the same centralized message broker orders
. This time, however, Dapr has created consumer group names prefixed with the namespace in which they’re running.
Without you needing to change your code/app-id
, the namespace consumer group allows you to:
app-id
across namespacesSimply include the "{namespace}"
consumer group construct in your component metadata. You don’t need to encode the namespace in the metadata. Dapr understands the namespace it is running in and completes the namespace value for you, like a dynamic metadata value injected by the runtime.
Watch this video for an overview on pub/sub multi-tenancy:
Unlike Deployments, where Pods are ephemeral, StatefulSets allows deployment of stateful applications on Kubernetes by keeping a sticky identity for each Pod.
Below is an example of a StatefulSet with Dapr:
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: python-subscriber
spec:
selector:
matchLabels:
app: python-subscriber # has to match .spec.template.metadata.labels
serviceName: "python-subscriber"
replicas: 3
template:
metadata:
labels:
app: python-subscriber # has to match .spec.selector.matchLabels
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "python-subscriber"
dapr.io/app-port: "5001"
spec:
containers:
- name: python-subscriber
image: ghcr.io/dapr/samples/pubsub-python-subscriber:latest
ports:
- containerPort: 5001
imagePullPolicy: Always
When subscribing to a pub/sub topic via Dapr, the application can define the consumerID
, which determines the subscriber’s position in the queue or topic. With the StatefulSets sticky identity of Pods, you can have a unique consumerID
per Pod, allowing each horizontal scale of the subscriber application. Dapr keeps track of the name of each Pod, which can be used when declaring components using the {podName}
marker.
On scaling the number of subscribers of a given topic, each Dapr component has unique settings that determine the behavior. Usually, there are two options for multiple consumers:
Kafka isolates each subscriber by consumerID
with its own position in the topic. When an instance restarts, it reuses the same consumerID
and continues from its last known position, without skipping messages. The component below demonstrates how a Kafka component can be used by multiple Pods:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.kafka
version: v1
metadata:
- name: brokers
value: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
- name: consumerID
value: "{podName}"
- name: authRequired
value: "false"
The MQTT3 protocol has shared topics, allowing multiple subscribers to “compete” for messages from the topic, meaning a message is only processed by one of them. For example:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: mqtt-pubsub
spec:
type: pubsub.mqtt3
version: v1
metadata:
- name: consumerID
value: "{podName}"
- name: cleanSession
value: "true"
- name: url
value: "tcp://admin:public@localhost:1883"
- name: qos
value: 1
- name: retain
value: "false"
Namespaces or component scopes can be used to limit component access to particular applications. These application scopes added to a component limit only the applications with specific IDs to be able to use the component.
In addition to this general component scope, the following can be limited for pub/sub components:
This is called pub/sub topic scoping.
Pub/sub scopes are defined for each pub/sub component. You may have a pub/sub component named pubsub
that has one set of scopes, and another pubsub2
with a different set.
To use this topic scoping three metadata properties can be set for a pub/sub component:
spec.metadata.publishingScopes
publishingScopes
(default behavior), all apps can publish to all topicsapp1=;app2=topic2
)app1=topic1;app2=topic2,topic3;app3=
will allow app1 to publish to topic1 and nothing else, app2 to publish to topic2 and topic3 only, and app3 to publish to nothing.spec.metadata.subscriptionScopes
subscriptionScopes
(default behavior), all apps can subscribe to all topicsapp1=topic1;app2=topic2,topic3
will allow app1 to subscribe to topic1 only and app2 to subscribe to topic2 and topic3spec.metadata.allowedTopics
allowedTopics
is not set (default behavior), all topics are valid. subscriptionScopes
and publishingScopes
still take place if present.publishingScopes
or subscriptionScopes
can be used in conjunction with allowedTopics
to add granular limitationsspec.metadata.protectedTopics
publishingScopes
or subscriptionScopes
to publish/subscribe to it.These metadata properties can be used for all pub/sub components. The following examples use Redis as pub/sub component.
Limiting which applications can publish/subscribe to topics can be useful if you have topics which contain sensitive information and only a subset of your applications are allowed to publish or subscribe to these.
It can also be used for all topics to have always a “ground truth” for which applications are using which topics as publishers/subscribers.
Here is an example of three applications and three topics:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "localhost:6379"
- name: redisPassword
value: ""
- name: publishingScopes
value: "app1=topic1;app2=topic2,topic3;app3="
- name: subscriptionScopes
value: "app2=;app3=topic1"
The table below shows which applications are allowed to publish into the topics:
topic1 | topic2 | topic3 | |
---|---|---|---|
app1 | ✅ | ||
app2 | ✅ | ✅ | |
app3 |
The table below shows which applications are allowed to subscribe to the topics:
topic1 | topic2 | topic3 | |
---|---|---|---|
app1 | ✅ | ✅ | ✅ |
app2 | |||
app3 | ✅ |
Note: If an application is not listed (e.g. app1 in subscriptionScopes) it is allowed to subscribe to all topics. Because
allowedTopics
is not used and app1 does not have any subscription scopes, it can also use additional topics not listed above.
A topic is created if a Dapr application sends a message to it. In some scenarios this topic creation should be governed. For example:
In these situations allowedTopics
can be used.
Here is an example of three allowed topics:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "localhost:6379"
- name: redisPassword
value: ""
- name: allowedTopics
value: "topic1,topic2,topic3"
All applications can use these topics, but only those topics, no others are allowed.
allowedTopics
and scopesSometimes you want to combine both scopes, thus only having a fixed set of allowed topics and specify scoping to certain applications.
Here is an example of three applications and two topics:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "localhost:6379"
- name: redisPassword
value: ""
- name: allowedTopics
value: "A,B"
- name: publishingScopes
value: "app1=A"
- name: subscriptionScopes
value: "app1=;app2=A"
Note: The third application is not listed, because if an app is not specified inside the scopes, it is allowed to use all topics.
The table below shows which application is allowed to publish into the topics:
A | B | C | |
---|---|---|---|
app1 | ✅ | ||
app2 | ✅ | ✅ | |
app3 | ✅ | ✅ |
The table below shows which application is allowed to subscribe to the topics:
A | B | C | |
---|---|---|---|
app1 | |||
app2 | ✅ | ||
app3 | ✅ | ✅ |
If your topic involves sensitive data, each new application must be explicitly listed in the publishingScopes
and subscriptionScopes
to ensure it cannot read from or write to that topic. Alternatively, you can designate the topic as ‘protected’ (using protectedTopics
) and grant access only to specific applications that genuinely require it.
Here is an example of three applications and three topics, two of which are protected:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: "localhost:6379"
- name: redisPassword
value: ""
- name: protectedTopics
value: "A,B"
- name: publishingScopes
value: "app1=A,B;app2=B"
- name: subscriptionScopes
value: "app1=A,B;app2=B"
In the example above, topics A and B are marked as protected. As a result, even though app3
is not listed under publishingScopes
or subscriptionScopes
, it cannot interact with these topics.
The table below shows which application is allowed to publish into the topics:
A | B | C | |
---|---|---|---|
app1 | ✅ | ✅ | |
app2 | ✅ | ||
app3 | ✅ |
The table below shows which application is allowed to subscribe to the topics:
A | B | C | |
---|---|---|---|
app1 | ✅ | ✅ | |
app2 | ✅ | ||
app3 | ✅ |
Dapr enables per-message time-to-live (TTL). This means that applications can set time-to-live per message, and subscribers do not receive those messages after expiration.
All Dapr pub/sub components are compatible with message TTL, as Dapr handles the TTL logic within the runtime. Simply set the ttlInSeconds
metadata when publishing a message.
In some components, such as Kafka, time-to-live can be configured in the topic via retention.ms
as per documentation. With message TTL in Dapr, applications using Kafka can now set time-to-live per message in addition to per topic.
When message time-to-live has native support in the pub/sub component, Dapr simply forwards the time-to-live configuration without adding any extra logic, keeping predictable behavior. This is helpful when the expired messages are handled differently by the component. For example, with Azure Service Bus, where expired messages are stored in the dead letter queue and are not simply deleted.
Azure Service Bus supports entity level time-to-live. This means that messages have a default time-to-live but can also be set with a shorter timespan at publishing time. Dapr propagates the time-to-live metadata for the message and lets Azure Service Bus handle the expiration directly.
If messages are consumed by subscribers not using Dapr, the expired messages are not automatically dropped, as expiration is handled by the Dapr runtime when a Dapr sidecar receives a message. However, subscribers can programmatically drop expired messages by adding logic to handle the expiration
attribute in the cloud event, which follows the RFC3339 format.
When non-Dapr subscribers use components such as Azure Service Bus, which natively handle message TTL, they do not receive expired messages. Here, no extra logic is needed.
Message TTL can be set in the metadata as part of the publishing request:
curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/TOPIC_A?metadata.ttlInSeconds=120 -H "Content-Type: application/json" -d '{"order-number": "345"}'
from dapr.clients import DaprClient
with DaprClient() as d:
req_data = {
'order-number': '345'
}
# Create a typed message with content type and body
resp = d.publish_event(
pubsub_name='pubsub',
topic='TOPIC_A',
data=json.dumps(req_data),
publish_metadata={'ttlInSeconds': '120'}
)
# Print the request
print(req_data, flush=True)
<?php
require_once __DIR__.'/vendor/autoload.php';
$app = \Dapr\App::create();
$app->run(function(\DI\FactoryInterface $factory) {
$publisher = $factory->make(\Dapr\PubSub\Publish::class, ['pubsub' => 'pubsub']);
$publisher->topic('TOPIC_A')->publish('data', ['ttlInSeconds' => '120']);
});
See this guide for a reference on the pub/sub API.
With the bulk publish and subscribe APIs, you can publish and subscribe to multiple messages in a single request. When writing applications that need to send or receive a large number of messages, using bulk operations allows achieving high throughput by reducing the overall number of requests between the Dapr sidecar, the application, and the underlying pub/sub broker.
The bulk publish API allows you to publish multiple messages to a topic in a single request. It is non-transactional, i.e., from a single bulk request, some messages can succeed and some can fail. If any of the messages fail to publish, the bulk publish operation returns a list of failed messages.
The bulk publish operation also does not guarantee any ordering of messages.
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import java.util.ArrayList;
import java.util.List;
class BulkPublisher {
private static final String PUBSUB_NAME = "my-pubsub-name";
private static final String TOPIC_NAME = "topic-a";
public void publishMessages() {
try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
// Create a list of messages to publish
List<String> messages = new ArrayList<>();
for (int i = 0; i < 10; i++) {
String message = String.format("This is message #%d", i);
messages.add(message);
}
// Publish list of messages using the bulk publish API
BulkPublishResponse<String> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block();
}
}
}
import { DaprClient } from "@dapr/dapr";
const pubSubName = "my-pubsub-name";
const topic = "topic-a";
async function start() {
const client = new DaprClient();
// Publish multiple messages to a topic.
await client.pubsub.publishBulk(pubSubName, topic, ["message 1", "message 2", "message 3"]);
// Publish multiple messages to a topic with explicit bulk publish messages.
const bulkPublishMessages = [
{
entryID: "entry-1",
contentType: "application/json",
event: { hello: "foo message 1" },
},
{
entryID: "entry-2",
contentType: "application/cloudevents+json",
event: {
specversion: "1.0",
source: "/some/source",
type: "example",
id: "1234",
data: "foo message 2",
datacontenttype: "text/plain"
},
},
{
entryID: "entry-3",
contentType: "text/plain",
event: "foo message 3",
},
];
await client.pubsub.publishBulk(pubSubName, topic, bulkPublishMessages);
}
start().catch((e) => {
console.error(e);
process.exit(1);
});
using System;
using System.Collections.Generic;
using Dapr.Client;
const string PubsubName = "my-pubsub-name";
const string TopicName = "topic-a";
IReadOnlyList<object> BulkPublishData = new List<object>() {
new { Id = "17", Amount = 10m },
new { Id = "18", Amount = 20m },
new { Id = "19", Amount = 30m }
};
using var client = new DaprClientBuilder().Build();
var res = await client.BulkPublishEventAsync(PubsubName, TopicName, BulkPublishData);
if (res == null) {
throw new Exception("null response from dapr");
}
if (res.FailedEntries.Count > 0)
{
Console.WriteLine("Some events failed to be published!");
foreach (var failedEntry in res.FailedEntries)
{
Console.WriteLine("EntryId: " + failedEntry.Entry.EntryId + " Error message: " +
failedEntry.ErrorMessage);
}
}
else
{
Console.WriteLine("Published all events!");
}
import requests
import json
base_url = "http://localhost:3500/v1.0-alpha1/publish/bulk/{}/{}"
pubsub_name = "my-pubsub-name"
topic_name = "topic-a"
payload = [
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
}
]
response = requests.post(base_url.format(pubsub_name, topic_name), json=payload)
print(response.status_code)
package main
import (
"fmt"
"strings"
"net/http"
"io/ioutil"
)
const (
pubsubName = "my-pubsub-name"
topicName = "topic-a"
baseUrl = "http://localhost:3500/v1.0-alpha1/publish/bulk/%s/%s"
)
func main() {
url := fmt.Sprintf(baseUrl, pubsubName, topicName)
method := "POST"
payload := strings.NewReader(`[
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
}
]`)
client := &http.Client {}
req, _ := http.NewRequest(method, url, payload)
req.Header.Add("Content-Type", "application/json")
res, err := client.Do(req)
// ...
}
curl -X POST http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a \
-H 'Content-Type: application/json' \
-d '[
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
},
]'
Invoke-RestMethod -Method Post -ContentType 'application/json' -Uri 'http://localhost:3500/v1.0-alpha1/publish/bulk/my-pubsub-name/topic-a' `
-Body '[
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
},
]'
The bulk subscribe API allows you to subscribe multiple messages from a topic in a single request. As we know from How to: Publish & Subscribe to topics, there are three ways to subscribe to topic(s):
To Bulk Subscribe to topic(s), we just need to use bulkSubscribe
spec attribute, something like following:
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: order-pub-sub
spec:
topic: orders
routes:
default: /checkout
pubsubname: order-pub-sub
bulkSubscribe:
enabled: true
maxMessagesCount: 100
maxAwaitDurationMs: 40
scopes:
- orderprocessing
- checkout
In the example above, bulkSubscribe
is optional. If you use bulkSubscribe
, then:
enabled
is mandatory and enables or disables bulk subscriptions on this topicmaxMessagesCount
) delivered in a bulk message.
Default value of maxMessagesCount
for components not supporting bulk subscribe is 100 i.e. for default bulk events between App and Dapr. Please refer How components handle publishing and subscribing to bulk messages.
If a component supports bulk subscribe, then default value for this parameter can be found in that component doc.maxAwaitDurationMs
) before a bulk message is sent to the app.
Default value of maxAwaitDurationMs
for components not supporting bulk subscribe is 1000 i.e. for default bulk events between App and Dapr. Please refer How components handle publishing and subscribing to bulk messages.
If a component supports bulk subscribe, then default value for this parameter can be found in that component doc.The application receives an EntryId
associated with each entry (individual message) in the bulk message. This EntryId
must be used by the app to communicate the status of that particular entry. If the app fails to notify on an EntryId
status, it’s considered a RETRY
.
A JSON-encoded payload body with the processing status against each entry needs to be sent:
{
"statuses":
[
{
"entryId": "<entryId1>",
"status": "<status>"
},
{
"entryId": "<entryId2>",
"status": "<status>"
}
]
}
Possible status values:
Status | Description |
---|---|
SUCCESS |
Message is processed successfully |
RETRY |
Message to be retried by Dapr |
DROP |
Warning is logged and message is dropped |
Refer to Expected HTTP Response for Bulk Subscribe for further insights on response.
The following code examples demonstrate how to use Bulk Subscribe.
import io.dapr.Topic;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
import io.dapr.client.domain.BulkSubscribeMessage;
import io.dapr.client.domain.BulkSubscribeMessageEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import reactor.core.publisher.Mono;
class BulkSubscriber {
@BulkSubscribe()
// @BulkSubscribe(maxMessagesCount = 100, maxAwaitDurationMs = 40)
@Topic(name = "topicbulk", pubsubName = "orderPubSub")
@PostMapping(path = "/topicbulk")
public Mono<BulkSubscribeAppResponse> handleBulkMessage(
@RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
return Mono.fromCallable(() -> {
List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
try {
CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
e.printStackTrace();
entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(entries);
});
}
}
import { DaprServer } from "@dapr/dapr";
const pubSubName = "orderPubSub";
const topic = "topicbulk";
const daprHost = process.env.DAPR_HOST || "127.0.0.1";
const daprPort = process.env.DAPR_HTTP_PORT || "3502";
const serverHost = process.env.SERVER_HOST || "127.0.0.1";
const serverPort = process.env.APP_PORT || 5001;
async function start() {
const server = new DaprServer({
serverHost,
serverPort,
clientOptions: {
daprHost,
daprPort,
},
});
// Publish multiple messages to a topic with default config.
await client.pubsub.bulkSubscribeWithDefaultConfig(pubSubName, topic, (data) => console.log("Subscriber received: " + JSON.stringify(data)));
// Publish multiple messages to a topic with specific maxMessagesCount and maxAwaitDurationMs.
await client.pubsub.bulkSubscribeWithConfig(pubSubName, topic, (data) => console.log("Subscriber received: " + JSON.stringify(data)), 100, 40);
}
using Microsoft.AspNetCore.Mvc;
using Dapr.AspNetCore;
using Dapr;
namespace DemoApp.Controllers;
[ApiController]
[Route("[controller]")]
public class BulkMessageController : ControllerBase
{
private readonly ILogger<BulkMessageController> logger;
public BulkMessageController(ILogger<BulkMessageController> logger)
{
this.logger = logger;
}
[BulkSubscribe("messages", 10, 10)]
[Topic("pubsub", "messages")]
public ActionResult<BulkSubscribeAppResponse> HandleBulkMessages([FromBody] BulkSubscribeMessage<BulkMessageModel<BulkMessageModel>> bulkMessages)
{
List<BulkSubscribeAppResponseEntry> responseEntries = new List<BulkSubscribeAppResponseEntry>();
logger.LogInformation($"Received {bulkMessages.Entries.Count()} messages");
foreach (var message in bulkMessages.Entries)
{
try
{
logger.LogInformation($"Received a message with data '{message.Event.Data.MessageData}'");
responseEntries.Add(new BulkSubscribeAppResponseEntry(message.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
}
catch (Exception e)
{
logger.LogError(e.Message);
responseEntries.Add(new BulkSubscribeAppResponseEntry(message.EntryId, BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(responseEntries);
}
public class BulkMessageModel
{
public string MessageData { get; set; }
}
}
Currently, you can only bulk subscribe in Python using an HTTP client.
import json
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
# Define the bulk subscribe configuration
subscriptions = [{
"pubsubname": "pubsub",
"topic": "TOPIC_A",
"route": "/checkout",
"bulkSubscribe": {
"enabled": True,
"maxMessagesCount": 3,
"maxAwaitDurationMs": 40
}
}]
print('Dapr pub/sub is subscribed to: ' + json.dumps(subscriptions))
return jsonify(subscriptions)
# Define the endpoint to handle incoming messages
@app.route('/checkout', methods=['POST'])
def checkout():
messages = request.json
print(messages)
for message in messages:
print(f"Received message: {message}")
return json.dumps({'success': True}), 200, {'ContentType': 'application/json'}
if __name__ == '__main__':
app.run(port=5000)
For event publish/subscribe, two kinds of network transfers are involved.
These are the opportunities where optimization is possible. When optimized, Bulk requests are made, which reduce the overall number of calls and thus increases throughput and provides better latency.
On enabling Bulk Publish and/or Bulk Subscribe, the communication between the App and Dapr sidecar (Point 1 above) is optimized for all components.
Optimization from Dapr sidecar to the pub/sub broker depends on a number of factors, for example:
Currently, the following components are updated to support this level of optimization:
Component | Bulk Publish | Bulk Subscribe |
---|---|---|
Kafka | Yes | Yes |
Azure Servicebus | Yes | Yes |
Azure Eventhubs | Yes | Yes |
Watch the following demos and presentations about bulk pub/sub.