This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Bindings

与外部系统交互或被外部系统触发

1 - bindings 概述

bindings API 模块的概述

通过 Dapr 的 bindings API,您可以利用外部系统的事件来触发应用程序,并与外部系统交互。使用 bindings API,您可以:

  • 避免连接到消息系统并进行轮询的复杂性(如队列和消息总线)。
  • 专注于业务逻辑,而不是系统交互的实现细节。
  • 使您的代码不依赖于特定的 SDK 或库。
  • 处理重试和故障恢复。
  • 在运行时可以切换不同的 bindings。
  • 构建具有特定环境 bindings 设置的可移植应用程序,而无需更改代码。

例如,通过 bindings,您的应用程序可以响应传入的 Twilio/SMS 消息,而无需:

  • 添加或配置第三方 Twilio SDK
  • 担心从 Twilio 轮询(或使用 WebSockets 等)
显示 bindings 的图示

在上图中:

  • 输入 binding 触发您应用程序上的一个方法。
  • 在组件上执行输出 binding 操作,例如 "create"

bindings 的开发独立于 Dapr 运行时。您可以查看并贡献 bindings

输入 bindings

通过输入 bindings,您可以在外部资源发生事件时触发您的应用程序。请求中可以发送可选的负载和元数据。

以下概述视频和演示展示了 Dapr 输入 binding 的工作原理。

要接收来自输入 binding 的事件:

  1. 定义描述 binding 类型及其元数据(如连接信息)的组件 YAML。
  2. 使用以下方式监听传入事件:
    • HTTP 端点
    • gRPC proto 库获取传入事件。

阅读使用输入 bindings 创建事件驱动应用程序指南以开始使用输入 bindings。

输出 bindings

通过输出 bindings,您可以调用外部资源。调用请求中可以发送可选的负载和元数据。

以下概述视频和演示展示了 Dapr 输出 binding 的工作原理。

要调用输出 binding:

  1. 定义描述 binding 类型及其元数据(如连接信息)的组件 YAML。
  2. 使用 HTTP 端点或 gRPC 方法调用 binding,并附带可选负载。
  3. 指定输出操作。输出操作取决于您使用的 binding 组件,可以包括:
    • "create"
    • "update"
    • "delete"
    • "exec"

阅读使用输出 bindings 与外部资源交互指南以开始使用输出 bindings。

binding 方向(可选)

您可以提供 direction 元数据字段以指示 binding 组件支持的方向。这可以使 Dapr sidecar 避免“等待应用程序准备就绪”状态,减少 Dapr sidecar 与应用程序之间的生命周期依赖:

  • "input"
  • "output"
  • "input, output"

查看 bindings direction 元数据的完整示例。

试用 bindings

快速入门和教程

想要测试 Dapr bindings API?通过以下快速入门和教程来查看 bindings 的实际应用:

快速入门/教程描述
bindings 快速入门使用输入 bindings 处理外部系统的事件,并使用输出 bindings 调用操作。
bindings 教程演示如何使用 Dapr 创建到其他组件的输入和输出 bindings。使用 bindings 连接到 Kafka。

直接在您的应用程序中开始使用 bindings

想要跳过快速入门?没问题。您可以直接在应用程序中试用 bindings 模块,以调用输出 bindings 和触发输入 bindings。在Dapr 安装完成后,您可以从输入 bindings 如何指南开始使用 bindings API。

下一步

2 - 操作指南:使用输入绑定触发应用程序

使用Dapr输入绑定触发事件驱动的应用程序

当外部资源发生事件时,您可以通过输入绑定来触发您的应用程序。外部资源可以是队列、消息管道、云服务、文件系统等。请求中可以发送可选的负载和元数据。

输入绑定非常适合用于事件驱动的处理、数据管道或一般的事件响应和后续处理。Dapr输入绑定允许您:

  • 在不需要特定SDK或库的情况下接收事件
  • 在不更改代码的情况下替换绑定
  • 专注于业务逻辑而不是事件资源的实现
示例服务的绑定图示

本指南使用Kafka绑定作为示例。您可以从绑定组件列表中找到您偏好的绑定规范。在本指南中:

  1. 示例调用/binding端点,使用checkout作为要调用的绑定名称。
  2. 负载需要放在data字段中,可以是任何可序列化为JSON的值。
  3. operation字段指定绑定需要执行的操作。例如,Kafka绑定支持create操作

创建绑定

创建一个binding.yaml文件,并保存到应用程序目录中的components子文件夹中。

创建一个名为checkout的新绑定组件。在metadata部分中,配置以下与Kafka相关的属性:

  • 您将发布消息的主题
  • 代理

在创建绑定组件时,指定绑定的支持direction

使用dapr run命令的--resources-path标志指向您的自定义资源目录。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: checkout
spec:
  type: bindings.kafka
  version: v1
  metadata:
  # Kafka代理连接设置
  - name: brokers
    value: localhost:9092
  # 消费者配置:主题和消费者组
  - name: topics
    value: sample
  - name: consumerGroup
    value: group1
  # 发布者配置:主题
  - name: publishTopic
    value: sample
  - name: authRequired
    value: false
  - name: direction
    value: input

要部署到Kubernetes集群中,运行kubectl apply -f binding.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: checkout
spec:
  type: bindings.kafka
  version: v1
  metadata:
  # Kafka代理连接设置
  - name: brokers
    value: localhost:9092
  # 消费者配置:主题和消费者组
  - name: topics
    value: sample
  - name: consumerGroup
    value: group1
  # 发布者配置:主题
  - name: publishTopic
    value: sample
  - name: authRequired
    value: false
  - name: direction
    value: input

监听传入事件(输入绑定)

配置您的应用程序以接收传入事件。如果您使用HTTP,您需要:

  • 监听一个POST端点,其名称与binding.yaml文件中的metadata.name指定的绑定名称相同。
  • 确保您的应用程序允许Dapr对该端点进行OPTIONS请求。

以下是利用Dapr SDK展示输入绑定的代码示例。

//依赖项
using System.Collections.Generic;
using System.Threading.Tasks;
using System;
using Microsoft.AspNetCore.Mvc;

//代码
namespace CheckoutService.controller
{
    [ApiController]
    public class CheckoutServiceController : Controller
    {
        [HttpPost("/checkout")]
        public ActionResult<string> getCheckout([FromBody] int orderId)
        {
            Console.WriteLine("Received Message: " + orderId);
            return "CID" + orderId;
        }
    }
}
//依赖项
import org.springframework.web.bind.annotation.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

//代码
@RestController
@RequestMapping("/")
public class CheckoutServiceController {
    private static final Logger log = LoggerFactory.getLogger(CheckoutServiceController.class);
        @PostMapping(path = "/checkout")
        public Mono<String> getCheckout(@RequestBody(required = false) byte[] body) {
            return Mono.fromRunnable(() ->
                    log.info("Received Message: " + new String(body)));
        }
}
#依赖项
import logging
from dapr.ext.grpc import App, BindingRequest

#代码
app = App()

@app.binding('checkout')
def getCheckout(request: BindingRequest):
    logging.basicConfig(level = logging.INFO)
    logging.info('Received Message : ' + request.text())

app.run(6002)
//依赖项
import (
	"encoding/json"
	"log"
	"net/http"
	"github.com/gorilla/mux"
)

//代码
func getCheckout(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json")
	var orderId int
	err := json.NewDecoder(r.Body).Decode(&orderId)
	log.Println("Received Message: ", orderId)
	if err != nil {
		log.Printf("error parsing checkout input binding payload: %s", err)
		w.WriteHeader(http.StatusOK)
		return
	}
}

func main() {
	r := mux.NewRouter()
	r.HandleFunc("/checkout", getCheckout).Methods("POST", "OPTIONS")
	http.ListenAndServe(":6002", r)
}
//依赖项 
import { DaprServer, CommunicationProtocolEnum } from '@dapr/dapr'; 

//代码
const daprHost = "127.0.0.1"; 
const serverHost = "127.0.0.1";
const serverPort = "6002"; 
const daprPort = "3602"; 

start().catch((e) => {
    console.error(e);
    process.exit(1);
});

async function start() {
    const server = new DaprServer({
        serverHost,
        serverPort,
        communicationProtocol: CommunicationProtocolEnum.HTTP,
        clientOptions: {
            daprHost,
            daprPort, 
        }
    });
    await server.binding.receive('checkout', async (orderId) => console.log(`Received Message: ${JSON.stringify(orderId)}`));
    await server.start();
}

确认事件

通过从HTTP处理程序返回200 OK响应,告知Dapr您已成功处理应用程序中的事件。

拒绝事件

通过返回200 OK以外的任何响应,告知Dapr事件在您的应用程序中未正确处理,并安排重新投递。例如,500 Error

指定自定义路由

默认情况下,传入事件将被发送到与输入绑定名称对应的HTTP端点。您可以通过在binding.yaml中设置以下元数据属性来覆盖此设置:

name: mybinding
spec:
  type: binding.rabbitmq
  metadata:
  - name: route
    value: /onevent

事件投递保证

事件投递保证由绑定实现控制。根据绑定实现,事件投递可以是精确一次或至少一次。

参考资料

3 - 操作指南:使用输出绑定与外部资源交互

通过输出绑定调用外部系统

使用输出绑定,您可以与外部资源进行交互。在调用请求中,您可以发送可选的负载和元数据。

示例服务的绑定图示

本指南以Kafka绑定为例。您可以从绑定组件列表中选择您偏好的绑定规范。在本指南中:

  1. 示例中调用了/binding端点,使用checkout作为要调用的绑定名称。
  2. 负载放在必需的data字段中,可以是任何JSON可序列化的值。
  3. operation字段指定绑定需要执行的操作。例如,Kafka绑定支持create操作

创建绑定

创建一个binding.yaml文件,并将其保存到应用程序目录中的components子文件夹中。

创建一个名为checkout的新绑定组件。在metadata部分中,配置以下与Kafka相关的属性:

  • 您将发布消息的主题
  • 代理

在创建绑定组件时,指定绑定的支持direction

使用dapr run--resources-path标志指向您的自定义资源目录。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: checkout
spec:
  type: bindings.kafka
  version: v1
  metadata:
  # Kafka代理连接设置
  - name: brokers
    value: localhost:9092
  # 消费者配置:主题和消费者组
  - name: topics
    value: sample
  - name: consumerGroup
    value: group1
  # 发布者配置:主题
  - name: publishTopic
    value: sample
  - name: authRequired
    value: false
  - name: direction
    value: output

要将以下binding.yaml文件部署到Kubernetes集群中,运行kubectl apply -f binding.yaml

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: checkout
spec:
  type: bindings.kafka
  version: v1
  metadata:
  # Kafka代理连接设置
  - name: brokers
    value: localhost:9092
  # 消费者配置:主题和消费者组
  - name: topics
    value: sample
  - name: consumerGroup
    value: group1
  # 发布者配置:主题
  - name: publishTopic
    value: sample
  - name: authRequired
    value: false
  - name: direction
    value: output

发送事件(输出绑定)

下面的代码示例利用Dapr SDK在运行的Dapr实例上调用输出绑定端点。

//依赖项
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using System.Threading;

//代码
namespace EventService
{
    class Program
    {
        static async Task Main(string[] args)
        {
            string BINDING_NAME = "checkout";
            string BINDING_OPERATION = "create";
            while(true)
            {
                System.Threading.Thread.Sleep(5000);
                Random random = new Random();
                int orderId = random.Next(1,1000);
                using var client = new DaprClientBuilder().Build();
                //使用Dapr SDK调用输出绑定
                await client.InvokeBindingAsync(BINDING_NAME, BINDING_OPERATION, orderId);
                Console.WriteLine("发送消息: " + orderId);
            }
        }
    }
}
//依赖项
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.HttpExtension;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Random;
import java.util.concurrent.TimeUnit;

//代码
@SpringBootApplication
public class OrderProcessingServiceApplication {

	private static final Logger log = LoggerFactory.getLogger(OrderProcessingServiceApplication.class);

	public static void main(String[] args) throws InterruptedException{
		String BINDING_NAME = "checkout";
		String BINDING_OPERATION = "create";
		while(true) {
			TimeUnit.MILLISECONDS.sleep(5000);
			Random random = new Random();
			int orderId = random.nextInt(1000-1) + 1;
			DaprClient client = new DaprClientBuilder().build();
          //使用Dapr SDK调用输出绑定
			client.invokeBinding(BINDING_NAME, BINDING_OPERATION, orderId).block();
			log.info("发送消息: " + orderId);
		}
	}
}
#依赖项
import random
from time import sleep    
import requests
import logging
import json
from dapr.clients import DaprClient

#代码
logging.basicConfig(level = logging.INFO)
BINDING_NAME = 'checkout'
BINDING_OPERATION = 'create' 
while True:
    sleep(random.randrange(50, 5000) / 1000)
    orderId = random.randint(1, 1000)
    with DaprClient() as client:
        #使用Dapr SDK调用输出绑定
        resp = client.invoke_binding(BINDING_NAME, BINDING_OPERATION, json.dumps(orderId))
    logging.basicConfig(level = logging.INFO)
    logging.info('发送消息: ' + str(orderId))
    
//依赖项
import (
	"context"
	"log"
	"math/rand"
	"time"
	"strconv"
	dapr "github.com/dapr/go-sdk/client"

)

//代码
func main() {
	BINDING_NAME := "checkout";
	BINDING_OPERATION := "create";
	for i := 0; i < 10; i++ {
		time.Sleep(5000)
		orderId := rand.Intn(1000-1) + 1
		client, err := dapr.NewClient()
		if err != nil {
			panic(err)
		}
		defer client.Close()
		ctx := context.Background()
        //使用Dapr SDK调用输出绑定
		in := &dapr.InvokeBindingRequest{ Name: BINDING_NAME, Operation: BINDING_OPERATION , Data: []byte(strconv.Itoa(orderId))}
		err = client.InvokeOutputBinding(ctx, in)
		log.Println("发送消息: " + strconv.Itoa(orderId))
	}
}
    
//依赖项
import { DaprClient, CommunicationProtocolEnum } from "@dapr/dapr";

//代码
const daprHost = "127.0.0.1";

(async function () {
    for (var i = 0; i < 10; i++) {
        await sleep(2000);
        const orderId = Math.floor(Math.random() * (1000 - 1) + 1);
        try {
            await sendOrder(orderId)
        } catch (err) {
            console.error(e);
            process.exit(1);
        }
    }
})();

async function sendOrder(orderId) {
    const BINDING_NAME = "checkout";
    const BINDING_OPERATION = "create";
    const client = new DaprClient({
        daprHost,
        daprPort: process.env.DAPR_HTTP_PORT,
        communicationProtocol: CommunicationProtocolEnum.HTTP,
    });
    //使用Dapr SDK调用输出绑定
    const result = await client.binding.send(BINDING_NAME, BINDING_OPERATION, orderId);
    console.log("发送消息: " + orderId);
}

function sleep(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
}

您还可以使用HTTP调用输出绑定端点:

curl -X POST -H 'Content-Type: application/json' http://localhost:3601/v1.0/bindings/checkout -d '{ "data": 100, "operation": "create" }'

观看此视频以了解如何使用双向输出绑定。

参考资料