此端点允许您将数据发布到多个正在监听某个 topic
的消费者。Dapr 保证此端点至少会被调用一次。
POST http://localhost:<daprPort>/v1.0/publish/<pubsubname>/<topic>[?<metadata>]
代码 | 描述 |
---|---|
204 | 消息已送达 |
403 | 访问控制禁止消息 |
404 | 未提供 pubsub 名称或主题 |
500 | 传递失败 |
参数 | 描述 |
---|---|
daprPort | Dapr 端口 |
pubsubname | pubsub 组件的名称 |
topic | 主题的名称 |
metadata | 查询参数,用于元数据,如下所述 |
注意,所有 URL 参数区分大小写。
curl -X POST http://localhost:3500/v1.0/publish/pubsubName/deathStarStatus \
-H "Content-Type: application/json" \
-d '{
"status": "completed"
}'
Content-Type
头部告知 Dapr 在构建 CloudEvent 信封时您的数据遵循哪种内容类型。Content-Type
头部的值填充 CloudEvent 中的 datacontenttype
字段。
除非指定,否则 Dapr 假定为 text/plain
。如果您的内容类型是 JSON,请使用值为 application/json
的 Content-Type
头部。
如果您想发送自定义的 CloudEvent,请为 Content-Type
头部使用 application/cloudevents+json
值。
元数据可以通过请求 URL 中的查询参数发送。它必须以 metadata.
为前缀,如下所示。
参数 | 描述 |
---|---|
metadata.ttlInSeconds | 消息过期的秒数,如此处所述 |
metadata.rawPayload | 布尔值,决定 Dapr 是否应在不将事件包装为 CloudEvent 的情况下发布事件,如此处所述 |
根据每个 pubsub 组件,还可以使用其他元数据参数。
此端点允许您将多条消息发布到正在监听某个 topic
的消费者。
POST http://localhost:<daprPort>/v1.0-alpha1/publish/bulk/<pubsubname>/<topic>[?<metadata>]
请求体应包含一个 JSON 数组,其中包含:
如果事件的内容类型不是 application/cloudevents+json
,则会自动包装为 CloudEvent(除非 metadata.rawPayload
设置为 true
)。
示例:
curl -X POST http://localhost:3500/v1.0-alpha1/publish/bulk/pubsubName/deathStarStatus \
-H 'Content-Type: application/json' \
-d '[
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"event": "first text message",
"contentType": "text/plain"
},
{
"entryId": "b1f40bd6-4af2-11ed-b878-0242ac120002",
"event": {
"message": "second JSON message"
},
"contentType": "application/json"
},
]'
由于请求体是一个 JSON 数组,因此 Content-Type
头部应始终设置为 application/json
。
参数 | 描述 |
---|---|
daprPort | Dapr 端口 |
pubsubname | 发布/订阅组件的名称 |
topic | 主题的名称 |
metadata | 元数据的查询参数 |
元数据可以通过请求 URL 中的查询参数发送。它必须以 metadata.
为前缀,如下表所示。
参数 | 描述 |
---|---|
metadata.rawPayload | 布尔值,决定 Dapr 是否应在不将消息包装为 CloudEvent 的情况下发布消息。 |
metadata.maxBulkPubBytes | 批量发布请求中要发布的最大字节数。 |
HTTP 状态 | 描述 |
---|---|
204 | 所有消息已送达 |
400 | 发布/订阅不存在 |
403 | 访问控制禁止 |
500 | 至少一条消息未能送达 |
如果状态码为 500,响应体将包含一个 JSON 对象,其中包含未能送达的条目列表。例如,在我们上面的请求中,如果事件 "first text message"
的条目未能送达,响应将包含其条目 ID 和来自底层 pub/sub 组件的错误消息。
{
"failedEntries": [
{
"entryId": "ae6bf7c6-4af2-11ed-b878-0242ac120002",
"error": "some error message"
},
],
"errorCode": "ERR_PUBSUB_PUBLISH_MESSAGE"
}
Dapr 将调用用户代码上的以下端点以发现主题订阅:
GET http://localhost:<appPort>/dapr/subscribe
参数 | 描述 |
---|---|
appPort | 应用程序端口 |
一个 JSON 编码的字符串数组。
示例:
[
{
"pubsubname": "pubsub",
"topic": "newOrder",
"routes": {
"rules": [
{
"match": "event.type == order",
"path": "/orders"
}
]
"default" : "/otherorders"
},
"metadata": {
"rawPayload": "true"
}
}
]
注意,所有订阅参数区分大小写。
可选地,元数据可以通过请求体发送。
参数 | 描述 |
---|---|
rawPayload | 布尔值,订阅不符合 CloudEvent 规范的事件,如此处所述 |
为了传递主题事件,将使用订阅响应中指定的路由对用户代码进行 POST
调用。在 routes
下,您可以提供在接收到消息主题时匹配特定条件到特定路径的规则。 您还可以为没有特定匹配的任何规则提供默认路由。
以下示例说明了这一点,考虑到主题 newOrder
的订阅和端口 3000 上的路由 orders
:POST http://localhost:3000/orders
POST http://localhost:<appPort>/<path>
注意,所有 URL 参数区分大小写。
参数 | 描述 |
---|---|
appPort | 应用程序端口 |
path | 订阅配置中的路由路径 |
HTTP 2xx 响应表示消息处理成功。
为了更丰富的响应处理,可以发送一个带有处理状态的 JSON 编码的负载体:
{
"status": "<status>"
}
状态 | 描述 |
---|---|
SUCCESS | 消息处理成功 |
RETRY | 消息由 Dapr 重试 |
DROP | 记录警告并丢弃消息 |
其他 | 错误,消息由 Dapr 重试 |
Dapr 假定没有 status
字段的 JSON 编码负载响应或带有 HTTP 2xx 的空负载响应为 SUCCESS
。
HTTP 响应可能与 HTTP 2xx 不同。以下是 Dapr 在不同 HTTP 状态下的行为:
HTTP 状态 | 描述 |
---|---|
2xx | 消息根据负载中的状态处理(如果为空则为 SUCCESS ;如果负载无效则忽略)。 |
404 | 记录错误并丢弃消息 |
其他 | 记录警告并重试消息 |
这允许您在监听某个 topic
时从代理订阅多条消息。
为了以批量方式接收主题订阅中的消息,应用程序:
bulkSubscribe
maxMessagesCount
和/或 maxAwaitDurationMs
有关如何选择的更多详细信息,请参阅批量发送和接收消息指南。HTTP 2xx 响应表示应用程序已处理此批量消息中的条目(单个消息),Dapr 现在将检查每个 EntryId 状态。 需要发送一个带有每个条目处理状态的 JSON 编码负载体:
{
"statuses":
[
{
"entryId": "<entryId1>",
"status": "<status>"
},
{
"entryId": "<entryId2>",
"status": "<status>"
}
]
}
注意:如果 Dapr 在从应用程序接收到的响应中找不到 EntryId 状态,则该条目的状态被视为
RETRY
。
状态 | 描述 |
---|---|
SUCCESS | 消息处理成功 |
RETRY | 消息由 Dapr 重试 |
DROP | 记录警告并丢弃消息 |
HTTP 响应可能与 HTTP 2xx 不同。以下是 Dapr 在不同 HTTP 状态下的行为:
HTTP 状态 | 描述 |
---|---|
2xx | 消息根据负载中的状态处理。 |
404 | 记录错误并丢弃所有消息 |
其他 | 记录警告并重试所有消息 |
Dapr 发布/订阅遵循 CloudEvents 1.0 版本。