Pub/sub routing is an implementation of content-based routing, a messaging pattern that utilizes a DSL instead of imperative application code. With pub/sub routing, you use expressions to route CloudEvents (based on their contents) to different URIs/paths and event handlers in your application. If no route matches, then an optional default route is used. This proves useful as your applications expand to support multiple event versions or special cases.
While routing can be implemented with code, keeping routing rules external from the application can improve portability.
This feature is available to both the declarative and programmatic subscription approaches, however does not apply to streaming subscriptions.
For declarative subscriptions, use dapr.io/v2alpha1
as the apiVersion
. Here is an example of subscriptions.yaml
using routing:
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: myevent-subscription
spec:
pubsubname: pubsub
topic: inventory
routes:
rules:
- match: event.type == "widget"
path: /widgets
- match: event.type == "gadget"
path: /gadgets
default: /products
scopes:
- app1
- app2
In the programmatic approach, the routes
structure is returned instead of route
. The JSON structure matches the declarative YAML:
import flask
from flask import request, jsonify
from flask_cors import CORS
import json
import sys
app = flask.Flask(__name__)
CORS(app)
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
subscriptions = [
{
'pubsubname': 'pubsub',
'topic': 'inventory',
'routes': {
'rules': [
{
'match': 'event.type == "widget"',
'path': '/widgets'
},
{
'match': 'event.type == "gadget"',
'path': '/gadgets'
},
],
'default': '/products'
}
}]
return jsonify(subscriptions)
@app.route('/products', methods=['POST'])
def ds_subscriber():
print(request.json, flush=True)
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
app.run()
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));
const port = 3000
app.get('/dapr/subscribe', (req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "inventory",
routes: {
rules: [
{
match: 'event.type == "widget"',
path: '/widgets'
},
{
match: 'event.type == "gadget"',
path: '/gadgets'
},
],
default: '/products'
}
}
]);
})
app.post('/products', (req, res) => {
console.log(req.body);
res.sendStatus(200);
});
app.listen(port, () => console.log(`consumer app listening on port ${port}!`))
[Topic("pubsub", "inventory", "event.type ==\"widget\"", 1)]
[HttpPost("widgets")]
public async Task<ActionResult<Stock>> HandleWidget(Widget widget, [FromServices] DaprClient daprClient)
{
// Logic
return stock;
}
[Topic("pubsub", "inventory", "event.type ==\"gadget\"", 2)]
[HttpPost("gadgets")]
public async Task<ActionResult<Stock>> HandleGadget(Gadget gadget, [FromServices] DaprClient daprClient)
{
// Logic
return stock;
}
[Topic("pubsub", "inventory")]
[HttpPost("products")]
public async Task<ActionResult<Stock>> HandleProduct(Product product, [FromServices] DaprClient daprClient)
{
// Logic
return stock;
}
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/gorilla/mux"
)
const appPort = 3000
type subscription struct {
PubsubName string `json:"pubsubname"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata,omitempty"`
Routes routes `json:"routes"`
}
type routes struct {
Rules []rule `json:"rules,omitempty"`
Default string `json:"default,omitempty"`
}
type rule struct {
Match string `json:"match"`
Path string `json:"path"`
}
// This handles /dapr/subscribe
func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
t := []subscription{
{
PubsubName: "pubsub",
Topic: "inventory",
Routes: routes{
Rules: []rule{
{
Match: `event.type == "widget"`,
Path: "/widgets",
},
{
Match: `event.type == "gadget"`,
Path: "/gadgets",
},
},
Default: "/products",
},
},
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(t)
}
func main() {
router := mux.NewRouter().StrictSlash(true)
router.HandleFunc("/dapr/subscribe", configureSubscribeHandler).Methods("GET")
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", appPort), router))
}
<?php
require_once __DIR__.'/vendor/autoload.php';
$app = \Dapr\App::create(configure: fn(\DI\ContainerBuilder $builder) => $builder->addDefinitions(['dapr.subscriptions' => [
new \Dapr\PubSub\Subscription(pubsubname: 'pubsub', topic: 'inventory', routes: (
rules: => [
('match': 'event.type == "widget"', path: '/widgets'),
('match': 'event.type == "gadget"', path: '/gadgets'),
]
default: '/products')),
]]));
$app->post('/products', function(
#[\Dapr\Attributes\FromBody]
\Dapr\PubSub\CloudEvent $cloudEvent,
\Psr\Log\LoggerInterface $logger
) {
$logger->alert('Received event: {event}', ['event' => $cloudEvent]);
return ['status' => 'SUCCESS'];
}
);
$app->start();
In these examples, depending on the event.type
, the application will be called on:
/widgets
/gadgets
/products
The expressions are written as Common Expression Language (CEL) where event
represents the cloud event. Any of the attributes from the CloudEvents core specification can be referenced in the expression.
Match “important” messages:
has(event.data.important) && event.data.important == true
Match deposits greater than $10,000:
event.type == "deposit" && int(event.data.amount) > 10000
event.data.amount
is not cast as integer, the match is not performed. For more information, see the CEL documentation.Match multiple versions of a message:
event.type == "mymessage.v1"
event.type == "mymessage.v2"
For reference, the following attributes are from the CloudEvents specification.
As defined by the term data, CloudEvents may include domain-specific information about the occurrence. When present, this information will be encapsulated within data
.
datacontenttype
attribute (e.g. application/json), and adheres to the dataschema
format when those respective attributes are present.The following attributes are required in all CloudEvents:
String
source
+ id
are unique for each distinct event. If a duplicate event is re-sent (e.g. due
to a network error), it may have the same id
. Consumers may assume that
events with identical source
and id
are duplicates.Type: URI-reference
Description: Identifies the context in which an event happened. Often this includes information such as:
The exact syntax and semantics behind the data encoded in the URI is defined by the event producer.
Producers must ensure that source
+ id
are unique for each distinct event.
An application may:
source
to each distinct producer, making it easier to produce unique IDs and preventing other producers from having the same source
.source
identifiers.A source may include more than one producer. In this case, the producers must collaborate to ensure that source
+ id
are unique for each distinct event.
Constraints:
Examples:
Type: String
Description: The version of the CloudEvents specification used by the event. This enables the interpretation of the context. Compliant event producers must use a value of 1.0
when referring to this version of the specification.
Currently, this attribute only includes the ‘major’ and ‘minor’ version numbers. This allows patch changes to the specification to be made without changing this property’s value in the serialization.
Note: for ‘release candidate’ releases, a suffix might be used for testing purposes.
Constraints:
String
type
. See Versioning of CloudEvents in the Primer for more information.The following attributes are optional to appear in CloudEvents. See the Notational Conventions section for more information on the definition of OPTIONAL.
Type: String
per RFC 2046
Description: Content type of data
value. This attribute enables data
to carry any type of content, whereby format and encoding might differ from that of the chosen event format.
For example, an event rendered using the JSON envelope format might carry an XML payload in data
. The consumer is informed by this attribute being set to "application/xml"
.
The rules for how data
content is rendered for different datacontenttype
values are defined in the event format specifications. For example, the JSON event format defines the relationship in section 3.1.
For some binary mode protocol bindings, this field is directly mapped to the respective protocol’s content-type metadata property. You can find normative rules for the binary mode and the content-type metadata mapping in the respective protocol.
In some event formats, you may omit the datacontenttype
attribute. For example, if a JSON format event has no datacontenttype
attribute, it’s implied that the data
is a JSON value conforming to the "application/json"
media type. In other words: a JSON-format event with no datacontenttype
is exactly equivalent to one with datacontenttype="application/json"
.
When translating an event message with no datacontenttype
attribute to a different format or protocol binding, the target datacontenttype
should be set explicitly to the implied datacontenttype
of the source.
Constraints:
For Media Type examples, see IANA Media Types
URI
data
adheres to. Incompatible changes to the schema should be reflected by a different URI. See Versioning of CloudEvents in the Primer for more information.Type: String
Description: This describes the event subject in the context of the event producer (identified by source
). In publish-subscribe scenarios, a subscriber will typically subscribe to events emitted by a source
. The source
identifier alone might not be sufficient as a qualifier for any specific event if the source
context has internal sub-structure.
Identifying the subject of the event in context metadata (opposed to only in the data
payload) is helpful in generic subscription filtering scenarios, where middleware is unable to interpret the data
content. In the above example, the subscriber might only be interested in blobs with names ending with ‘.jpg’ or ‘.jpeg’. With the subject
attribute, you can construct a simple and efficient string-suffix filter for that subset of events.
Constraints:
Example:
A subscriber might register interest for when new blobs are created inside a blob-storage container. In this case:
source
identifies the subscription scope (storage container)type
identifies the “blob created” eventid
uniquely identifies the event instance to distinguish separately created occurrences of a same-named blob.The name of the newly created blob is carried in subject
:
source
: https://example.com/storage/tenant/containersubject
: mynewfile.jpgTimestamp
source
must be consistent in this respect. In other words, either they all use the actual time of the occurrence or they all use the same algorithm to determine the value used.Watch this video on how to use message routing with pub/sub: