This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Dapr Java SDK

Java SDK包,用于开发Dapr应用

Dapr 提供多种包以帮助开发 Java 应用程序。通过这些包,您可以使用 Dapr 创建 Java 客户端、服务器和虚拟 actor。

前提条件

导入 Dapr Java SDK

接下来,导入 Java SDK 包以开始使用。选择您喜欢的构建工具以了解如何导入。

对于 Maven 项目,将以下内容添加到您的 pom.xml 文件中:

<project>
  ...
  <dependencies>
    ...
    <!-- Dapr 的核心 SDK,包含所有功能,actor 除外。 -->
    <dependency>
      <groupId>io.dapr</groupId>
      <artifactId>dapr-sdk</artifactId>
      <version>1.13.1</version>
    </dependency>
    <!-- Dapr 的 actor SDK(可选)。 -->
    <dependency>
      <groupId>io.dapr</groupId>
      <artifactId>dapr-sdk-actors</artifactId>
      <version>1.13.1</version>
    </dependency>
    <!-- Dapr 与 SpringBoot 的 SDK 集成(可选)。 -->
    <dependency>
      <groupId>io.dapr</groupId>
      <artifactId>dapr-sdk-springboot</artifactId>
      <version>1.13.1</version>
    </dependency>
    ...
  </dependencies>
  ...
</project>

对于 Gradle 项目,将以下内容添加到您的 build.gradle 文件中:

dependencies {
...
    // Dapr 的核心 SDK,包含所有功能,actor 除外。
    compile('io.dapr:dapr-sdk:1.13.1')
    // Dapr 的 actor SDK(可选)。
    compile('io.dapr:dapr-sdk-actors:1.13.1')
    // Dapr 与 SpringBoot 的 SDK 集成(可选)。
    compile('io.dapr:dapr-sdk-springboot:1.13.1')
}

如果您也在使用 Spring Boot,可能会遇到一个常见问题,即 Dapr SDK 使用的 OkHttp 版本与 Spring Boot Bill of Materials 中指定的版本冲突。

您可以通过在项目中指定与 Dapr SDK 使用的版本兼容的 OkHttp 版本来解决此问题:

<dependency>
  <groupId>com.squareup.okhttp3</groupId>
  <artifactId>okhttp</artifactId>
  <version>1.13.1</version>
</dependency>

试用

测试 Dapr Java SDK。通过 Java 快速入门和教程来查看 Dapr 的实际应用:

SDK 示例描述
快速入门使用 Java SDK 在几分钟内体验 Dapr 的 API 构建块。
SDK 示例克隆 SDK 仓库以尝试一些示例并开始使用。
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;

try (DaprClient client = (new DaprClientBuilder()).build()) {
  // 发送带有消息的类;BINDING_OPERATION="create"
  client.invokeBinding(BINDING_NAME, BINDING_OPERATION, myClass).block();

  // 发送纯字符串
  client.invokeBinding(BINDING_NAME, BINDING_OPERATION, message).block();
}

可用包

客户端

创建与 Dapr sidecar 和其他 Dapr 应用程序交互的 Java 客户端。

工作流

创建和管理与其他 Dapr API 一起使用的工作流。

1 - 工作流

如何使用 Dapr 工作流扩展快速启动和运行

在这篇文档中,我们将介绍如何使用 Dapr 工作流扩展来快速启动和运行工作流。Dapr 工作流扩展为定义和管理工作流提供了一种简便的方法,使开发者能够轻松地在分布式系统中编排复杂的业务流程。

Dapr 工作流扩展的主要功能包括:

  • 工作流定义:可以使用 YAML 或 JSON 格式来定义工作流。
  • 状态管理:通过 Dapr 的状态管理功能,确保工作流在执行过程中状态的正确维护。
  • 服务调用:利用 Dapr 的服务调用功能,在工作流中与其他服务进行交互。
  • 事件驱动:通过发布/订阅机制,工作流可以响应事件并触发相应的操作。
  • 定时器和提醒:使用定时器和提醒功能,在工作流中设置定时任务和提醒。

通过这些功能,Dapr 工作流扩展能够帮助开发者更高效地构建和管理分布式应用程序中的工作流。

1.1 - 如何:在 Java SDK 中编写和管理 Dapr 工作流

如何使用 Dapr Java SDK 快速启动和运行工作流

我们来创建一个 Dapr 工作流,并通过控制台调用它。通过提供的工作流示例,您将:

自托管模式下,此示例使用 dapr init 的默认配置运行。

准备工作

  • 确保您使用的是最新版本的 proto 绑定

设置环境

克隆 Java SDK 仓库并进入其中。

git clone https://github.com/dapr/java-sdk.git
cd java-sdk

运行以下命令以安装运行此工作流示例所需的 Dapr Java SDK 依赖项。

mvn clean install

从 Java SDK 根目录,导航到 Dapr 工作流示例。

cd examples

运行 DemoWorkflowWorker

DemoWorkflowWorker 类在 Dapr 的工作流运行时引擎中注册了 DemoWorkflow 的实现。在 DemoWorkflowWorker.java 文件中,您可以找到 DemoWorkflowWorker 类和 main 方法:

public class DemoWorkflowWorker {

  public static void main(String[] args) throws Exception {
    // Register the Workflow with the runtime.
    WorkflowRuntime.getInstance().registerWorkflow(DemoWorkflow.class);
    System.out.println("Start workflow runtime");
    WorkflowRuntime.getInstance().startAndBlock();
    System.exit(0);
  }
}

在上面的代码中:

  • WorkflowRuntime.getInstance().registerWorkflow()DemoWorkflow 注册为 Dapr 工作流运行时中的一个工作流。
  • WorkflowRuntime.getInstance().start() 在 Dapr 工作流运行时中构建并启动引擎。

在终端中,执行以下命令以启动 DemoWorkflowWorker

dapr run --app-id demoworkflowworker --resources-path ./components/workflows --dapr-grpc-port 50001 -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.DemoWorkflowWorker

预期输出

You're up and running! Both Dapr and your app logs will appear here.

...

== APP == Start workflow runtime
== APP == Sep 13, 2023 9:02:03 AM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock
== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001.

运行 DemoWorkflowClient

DemoWorkflowClient 启动已在 Dapr 中注册的工作流实例。

public class DemoWorkflowClient {

  // ...
  public static void main(String[] args) throws InterruptedException {
    DaprWorkflowClient client = new DaprWorkflowClient();

    try (client) {
      String separatorStr = "*******";
      System.out.println(separatorStr);
      String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class, "input data");
      System.out.printf("Started new workflow instance with random ID: %s%n", instanceId);

      System.out.println(separatorStr);
      System.out.println("**GetInstanceMetadata:Running Workflow**");
      WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true);
      System.out.printf("Result: %s%n", workflowMetadata);

      System.out.println(separatorStr);
      System.out.println("**WaitForInstanceStart**");
      try {
        WorkflowInstanceStatus waitForInstanceStartResult =
            client.waitForInstanceStart(instanceId, Duration.ofSeconds(60), true);
        System.out.printf("Result: %s%n", waitForInstanceStartResult);
      } catch (TimeoutException ex) {
        System.out.printf("waitForInstanceStart has an exception:%s%n", ex);
      }

      System.out.println(separatorStr);
      System.out.println("**SendExternalMessage**");
      client.raiseEvent(instanceId, "TestEvent", "TestEventPayload");

      System.out.println(separatorStr);
      System.out.println("** Registering parallel Events to be captured by allOf(t1,t2,t3) **");
      client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload");
      client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload");
      client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload");
      System.out.printf("Events raised for workflow with instanceId: %s\n", instanceId);

      System.out.println(separatorStr);
      System.out.println("** Registering Event to be captured by anyOf(t1,t2,t3) **");
      client.raiseEvent(instanceId, "e2", "event 2 Payload");
      System.out.printf("Event raised for workflow with instanceId: %s\n", instanceId);

      System.out.println(separatorStr);
      System.out.println("**WaitForInstanceCompletion**");
      try {
        WorkflowInstanceStatus waitForInstanceCompletionResult =
            client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true);
        System.out.printf("Result: %s%n", waitForInstanceCompletionResult);
      } catch (TimeoutException ex) {
        System.out.printf("waitForInstanceCompletion has an exception:%s%n", ex);
      }

      System.out.println(separatorStr);
      System.out.println("**purgeInstance**");
      boolean purgeResult = client.purgeInstance(instanceId);
      System.out.printf("purgeResult: %s%n", purgeResult);

      System.out.println(separatorStr);
      System.out.println("**raiseEvent**");

      String eventInstanceId = client.scheduleNewWorkflow(DemoWorkflow.class);
      System.out.printf("Started new workflow instance with random ID: %s%n", eventInstanceId);
      client.raiseEvent(eventInstanceId, "TestException", null);
      System.out.printf("Event raised for workflow with instanceId: %s\n", eventInstanceId);

      System.out.println(separatorStr);
      String instanceToTerminateId = "terminateMe";
      client.scheduleNewWorkflow(DemoWorkflow.class, null, instanceToTerminateId);
      System.out.printf("Started new workflow instance with specified ID: %s%n", instanceToTerminateId);

      TimeUnit.SECONDS.sleep(5);
      System.out.println("Terminate this workflow instance manually before the timeout is reached");
      client.terminateWorkflow(instanceToTerminateId, null);
      System.out.println(separatorStr);

      String restartingInstanceId = "restarting";
      client.scheduleNewWorkflow(DemoWorkflow.class, null, restartingInstanceId);
      System.out.printf("Started new  workflow instance with ID: %s%n", restartingInstanceId);
      System.out.println("Sleeping 30 seconds to restart the workflow");
      TimeUnit.SECONDS.sleep(30);

      System.out.println("**SendExternalMessage: RestartEvent**");
      client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload");

      System.out.println("Sleeping 30 seconds to terminate the eternal workflow");
      TimeUnit.SECONDS.sleep(30);
      client.terminateWorkflow(restartingInstanceId, null);
    }

    System.out.println("Exiting DemoWorkflowClient.");
    System.exit(0);
  }
}

在第二个终端窗口中,通过运行以下命令启动工作流:

java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.workflows.DemoWorkflowClient

预期输出

*******
Started new workflow instance with random ID: 0b4cc0d5-413a-4c1c-816a-a71fa24740d4
*******
**GetInstanceMetadata:Running Workflow**
Result: [Name: 'io.dapr.examples.workflows.DemoWorkflow', ID: '0b4cc0d5-413a-4c1c-816a-a71fa24740d4', RuntimeStatus: RUNNING, CreatedAt: 2023-09-13T13:02:30.547Z, LastUpdatedAt: 2023-09-13T13:02:30.699Z, Input: '"input data"', Output: '']
*******
**WaitForInstanceStart**
Result: [Name: 'io.dapr.examples.workflows.DemoWorkflow', ID: '0b4cc0d5-413a-4c1c-816a-a71fa24740d4', RuntimeStatus: RUNNING, CreatedAt: 2023-09-13T13:02:30.547Z, LastUpdatedAt: 2023-09-13T13:02:30.699Z, Input: '"input data"', Output: '']
*******
**SendExternalMessage**
*******
** Registering parallel Events to be captured by allOf(t1,t2,t3) **
Events raised for workflow with instanceId: 0b4cc0d5-413a-4c1c-816a-a71fa24740d4
*******
** Registering Event to be captured by anyOf(t1,t2,t3) **
Event raised for workflow with instanceId: 0b4cc0d5-413a-4c1c-816a-a71fa24740d4
*******
**WaitForInstanceCompletion**
Result: [Name: 'io.dapr.examples.workflows.DemoWorkflow', ID: '0b4cc0d5-413a-4c1c-816a-a71fa24740d4', RuntimeStatus: FAILED, CreatedAt: 2023-09-13T13:02:30.547Z, LastUpdatedAt: 2023-09-13T13:02:55.054Z, Input: '"input data"', Output: '']
*******
**purgeInstance**
purgeResult: true
*******
**raiseEvent**
Started new workflow instance with random ID: 7707d141-ebd0-4e54-816e-703cb7a52747
Event raised for workflow with instanceId: 7707d141-ebd0-4e54-816e-703cb7a52747
*******
Started new workflow instance with specified ID: terminateMe
Terminate this workflow instance manually before the timeout is reached
*******
Started new  workflow instance with ID: restarting
Sleeping 30 seconds to restart the workflow
**SendExternalMessage: RestartEvent**
Sleeping 30 seconds to terminate the eternal workflow
Exiting DemoWorkflowClient.

发生了什么?

  1. 当您运行 dapr run 时,工作流工作者将工作流(DemoWorkflow)及其活动注册到 Dapr 工作流引擎。
  2. 当您运行 java 时,工作流客户端启动了具有以下活动的工作流实例。您可以在运行 dapr run 的终端中查看输出。
    1. 工作流启动,触发三个并行任务,并等待它们完成。
    2. 工作流客户端调用活动并将 “Hello Activity” 消息发送到控制台。
    3. 工作流超时并被清除。
    4. 工作流客户端启动一个具有随机 ID 的新工作流实例,使用另一个名为 terminateMe 的工作流实例终止它,并使用名为 restarting 的工作流重新启动它。
    5. 然后工作流客户端退出。

下一步

2 - 使用 Dapr 客户端 Java SDK 入门

如何使用 Dapr Java SDK 快速上手

Dapr 客户端包使您能够从 Java 应用程序与其他 Dapr 应用程序进行交互。

前提条件

完成初始设置并将 Java SDK 导入您的项目

初始化客户端

您可以这样初始化 Dapr 客户端:

DaprClient client = new DaprClientBuilder().build();

这会连接到默认的 Dapr gRPC 端点 localhost:50001

环境变量

Dapr Sidecar 端点

您可以使用标准化的 DAPR_GRPC_ENDPOINTDAPR_HTTP_ENDPOINT 环境变量来指定不同的 gRPC 或 HTTP 端点。当设置了这些变量时,客户端将自动使用它们连接到 Dapr sidecar。

旧的环境变量 DAPR_HTTP_PORTDAPR_GRPC_PORT 仍然受支持,但 DAPR_GRPC_ENDPOINTDAPR_HTTP_ENDPOINT 优先。

Dapr API 令牌

如果您的 Dapr 实例需要 DAPR_API_TOKEN 环境变量,您可以在环境中设置,客户端会自动使用。
您可以在这里阅读更多关于 Dapr API 令牌认证的信息。

错误处理

最初,Dapr 中的错误遵循标准 gRPC 错误模型。为了提供更详细的信息,在 1.13 版本中引入了一个增强的错误模型,与 gRPC 的丰富错误模型对齐。Java SDK 扩展了 DaprException,以包含 Dapr 中添加的错误详细信息。

使用 Dapr Java SDK 处理 DaprException 并消费错误详细信息的示例:

...
      try {
        client.publishEvent("unknown_pubsub", "mytopic", "mydata").block();
      } catch (DaprException exception) {
        System.out.println("Dapr 异常的错误代码: " + exception.getErrorCode());
        System.out.println("Dapr 异常的消息: " + exception.getMessage());
        // DaprException 现在通过 `getStatusDetails()` 提供来自 Dapr 运行时的更多错误详细信息。
        System.out.println("Dapr 异常的原因: " + exception.getStatusDetails().get(
        DaprErrorDetails.ErrorDetailType.ERROR_INFO,
            "reason",
        TypeRef.STRING));
      }
...

构建块

Java SDK 允许您与所有 Dapr 构建块进行接口交互。

调用服务

import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;

try (DaprClient client = (new DaprClientBuilder()).build()) {
  // 调用 'GET' 方法 (HTTP) 跳过序列化: 返回类型为 Mono<byte[]>
  // 对于 gRPC 设置 HttpExtension.NONE 参数
  response = client.invokeMethod(SERVICE_TO_INVOKE, METHOD_TO_INVOKE, "{\"name\":\"World!\"}", HttpExtension.GET, byte[].class).block();

  // 调用 'POST' 方法 (HTTP) 跳过序列化: 返回类型为 Mono<byte[]>     
  response = client.invokeMethod(SERVICE_TO_INVOKE, METHOD_TO_INVOKE, "{\"id\":\"100\", \"FirstName\":\"Value\", \"LastName\":\"Value\"}", HttpExtension.POST, byte[].class).block();

  System.out.println(new String(response));

  // 调用 'POST' 方法 (HTTP) 带序列化: 返回类型为 Mono<Employee>      
  Employee newEmployee = new Employee("Nigel", "Guitarist");
  Employee employeeResponse = client.invokeMethod(SERVICE_TO_INVOKE, "employees", newEmployee, HttpExtension.POST, Employee.class).block();
}

保存和获取应用程序状态

import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.State;
import reactor.core.publisher.Mono;

try (DaprClient client = (new DaprClientBuilder()).build()) {
  // 保存状态
  client.saveState(STATE_STORE_NAME, FIRST_KEY_NAME, myClass).block();

  // 获取状态
  State<MyClass> retrievedMessage = client.getState(STATE_STORE_NAME, FIRST_KEY_NAME, MyClass.class).block();

  // 删除状态
  client.deleteState(STATE_STORE_NAME, FIRST_KEY_NAME).block();
}

发布和订阅消息

发布消息
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.Metadata;
import static java.util.Collections.singletonMap;

try (DaprClient client = (new DaprClientBuilder()).build()) {
  client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message, singletonMap(Metadata.TTL_IN_SECONDS, MESSAGE_TTL_IN_SECONDS)).block();
}
订阅消息
import com.fasterxml.jackson.databind.ObjectMapper;
import io.dapr.Topic;
import io.dapr.client.domain.BulkSubscribeAppResponse;
import io.dapr.client.domain.BulkSubscribeAppResponseEntry;
import io.dapr.client.domain.BulkSubscribeAppResponseStatus;
import io.dapr.client.domain.BulkSubscribeMessage;
import io.dapr.client.domain.BulkSubscribeMessageEntry;
import io.dapr.client.domain.CloudEvent;
import io.dapr.springboot.annotations.BulkSubscribe;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;

@RestController
public class SubscriberController {

  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

  @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}")
  @PostMapping(path = "/testingtopic")
  public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<?> cloudEvent) {
    return Mono.fromRunnable(() -> {
      try {
        System.out.println("Subscriber got: " + cloudEvent.getData());
        System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    });
  }

  @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
          rule = @Rule(match = "event.type == 'myevent.v2'", priority = 1))
  @PostMapping(path = "/testingtopicV2")
  public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent envelope) {
    return Mono.fromRunnable(() -> {
      try {
        System.out.println("Subscriber got: " + cloudEvent.getData());
        System.out.println("Subscriber got: " + OBJECT_MAPPER.writeValueAsString(cloudEvent));
      } catch (Exception e) {
        throw new RuntimeException(e);
      }
    });
  }

  @BulkSubscribe()
  @Topic(name = "testingtopicbulk", pubsubName = "${myAppProperty:messagebus}")
  @PostMapping(path = "/testingtopicbulk")
  public Mono<BulkSubscribeAppResponse> handleBulkMessage(
          @RequestBody(required = false) BulkSubscribeMessage<CloudEvent<String>> bulkMessage) {
    return Mono.fromCallable(() -> {
      if (bulkMessage.getEntries().size() == 0) {
        return new BulkSubscribeAppResponse(new ArrayList<BulkSubscribeAppResponseEntry>());
      }

      System.out.println("Bulk Subscriber received " + bulkMessage.getEntries().size() + " messages.");

      List<BulkSubscribeAppResponseEntry> entries = new ArrayList<BulkSubscribeAppResponseEntry>();
      for (BulkSubscribeMessageEntry<?> entry : bulkMessage.getEntries()) {
        try {
          System.out.printf("Bulk Subscriber message has entry ID: %s\n", entry.getEntryId());
          CloudEvent<?> cloudEvent = (CloudEvent<?>) entry.getEvent();
          System.out.printf("Bulk Subscriber got: %s\n", cloudEvent.getData());
          entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.SUCCESS));
        } catch (Exception e) {
          e.printStackTrace();
          entries.add(new BulkSubscribeAppResponseEntry(entry.getEntryId(), BulkSubscribeAppResponseStatus.RETRY));
        }
      }
      return new BulkSubscribeAppResponse(entries);
    });
  }
}
批量发布消息

注意: API 处于 Alpha 阶段

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.BulkPublishResponse;
import io.dapr.client.domain.BulkPublishResponseFailedEntry;
import java.util.ArrayList;
import java.util.List;
class Solution {
  public void publishMessages() {
    try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
      // 创建要发布的消息列表
      List<String> messages = new ArrayList<>();
      for (int i = 0; i < NUM_MESSAGES; i++) {
        String message = String.format("This is message #%d", i);
        messages.add(message);
        System.out.println("Going to publish message : " + message);
      }

      // 使用批量发布 API 发布消息列表
      BulkPublishResponse<String> res = client.publishEvents(PUBSUB_NAME, TOPIC_NAME, "text/plain", messages).block()
    }
  }
}

与输出绑定交互

import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;

try (DaprClient client = (new DaprClientBuilder()).build()) {
  // 发送带有消息的类; BINDING_OPERATION="create"
  client.invokeBinding(BINDING_NAME, BINDING_OPERATION, myClass).block();

  // 发送纯字符串
  client.invokeBinding(BINDING_NAME, BINDING_OPERATION, message).block();
}

与输入绑定交互

import org.springframework.web.bind.annotation.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RestController
@RequestMapping("/")
public class myClass {
    private static final Logger log = LoggerFactory.getLogger(myClass);
        @PostMapping(path = "/checkout")
        public Mono<String> getCheckout(@RequestBody(required = false) byte[] body) {
            return Mono.fromRunnable(() ->
                    log.info("Received Message: " + new String(body)));
        }
}

检索秘密

import com.fasterxml.jackson.databind.ObjectMapper;
importio.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import java.util.Map;

try (DaprClient client = (new DaprClientBuilder()).build()) {
  Map<String, String> secret = client.getSecret(SECRET_STORE_NAME, secretKey).block();
  System.out.println(JSON_SERIALIZER.writeValueAsString(secret));
}

Actors

actor 是一个具有单线程执行的隔离、独立的计算和状态单元。Dapr 提供了一种基于 虚拟 actor 模式 的 actor 实现,该模式提供了单线程编程模型,并且当 actor 不在使用时会被垃圾回收。使用 Dapr 的实现,您可以根据 actor 模型编写 Dapr actor,Dapr 利用底层平台提供的可扩展性和可靠性。

import io.dapr.actors.ActorMethod;
import io.dapr.actors.ActorType;
import reactor.core.publisher.Mono;

@ActorType(name = "DemoActor")
public interface DemoActor {

  void registerReminder();

  @ActorMethod(name = "echo_message")
  String say(String something);

  void clock(String message);

  @ActorMethod(returns = Integer.class)
  Mono<Integer> incrementAndGet(int delta);
}

获取和订阅应用程序配置

注意这是一个预览 API,因此只能通过 DaprPreviewClient 接口访问,而不能通过普通的 DaprClient 接口访问

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.ConfigurationItem;
import io.dapr.client.domain.GetConfigurationRequest;
import io.dapr.client.domain.SubscribeConfigurationRequest;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
  // 获取单个键的配置
  Mono<ConfigurationItem> item = client.getConfiguration(CONFIG_STORE_NAME, CONFIG_KEY).block();

  // 获取多个键的配置
  Mono<Map<String, ConfigurationItem>> items =
          client.getConfiguration(CONFIG_STORE_NAME, CONFIG_KEY_1, CONFIG_KEY_2);

  // 订阅配置更改
  Flux<SubscribeConfigurationResponse> outFlux = client.subscribeConfiguration(CONFIG_STORE_NAME, CONFIG_KEY_1, CONFIG_KEY_2);
  outFlux.subscribe(configItems -> configItems.forEach(...));

  // 取消订阅配置更改
  Mono<UnsubscribeConfigurationResponse> unsubscribe = client.unsubscribeConfiguration(SUBSCRIPTION_ID, CONFIG_STORE_NAME)
}

查询保存的状态

注意这是一个预览 API,因此只能通过 DaprPreviewClient 接口访问,而不能通过普通的 DaprClient 接口访问

import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.QueryStateItem;
import io.dapr.client.domain.QueryStateRequest;
import io.dapr.client.domain.QueryStateResponse;
import io.dapr.client.domain.query.Query;
import io.dapr.client.domain.query.Sorting;
import io.dapr.client.domain.query.filters.EqFilter;

try (DaprClient client = builder.build(); DaprPreviewClient previewClient = builder.buildPreviewClient()) {
        String searchVal = args.length == 0 ? "searchValue" : args[0];
        
        // 创建 JSON 数据
        Listing first = new Listing();
        first.setPropertyType("apartment");
        first.setId("1000");
        ...
        Listing second = new Listing();
        second.setPropertyType("row-house");
        second.setId("1002");
        ...
        Listing third = new Listing();
        third.setPropertyType("apartment");
        third.setId("1003");
        ...
        Listing fourth = new Listing();
        fourth.setPropertyType("apartment");
        fourth.setId("1001");
        ...
        Map<String, String> meta = new HashMap<>();
        meta.put("contentType", "application/json");

        // 保存状态
        SaveStateRequest request = new SaveStateRequest(STATE_STORE_NAME).setStates(
          new State<>("1", first, null, meta, null),
          new State<>("2", second, null, meta, null),
          new State<>("3", third, null, meta, null),
          new State<>("4", fourth, null, meta, null)
        );
        client.saveBulkState(request).block();
        
        
        // 创建查询和查询状态请求

        Query query = new Query()
          .setFilter(new EqFilter<>("propertyType", "apartment"))
          .setSort(Arrays.asList(new Sorting("id", Sorting.Order.DESC)));
        QueryStateRequest request = new QueryStateRequest(STATE_STORE_NAME)
          .setQuery(query);

        // 使用预览客户端调用查询状态 API
        QueryStateResponse<MyData> result = previewClient.queryState(request, MyData.class).block();
        
        // 查看查询状态响应 
        System.out.println("Found " + result.getResults().size() + " items.");
        for (QueryStateItem<Listing> item : result.getResults()) {
          System.out.println("Key: " + item.getKey());
          System.out.println("Data: " + item.getValue());
        }
}

分布式锁

package io.dapr.examples.lock.grpc;

import io.dapr.client.DaprClientBuilder;
import io.dapr.client.DaprPreviewClient;
import io.dapr.client.domain.LockRequest;
import io.dapr.client.domain.UnlockRequest;
import io.dapr.client.domain.UnlockResponseStatus;
import reactor.core.publisher.Mono;

public class DistributedLockGrpcClient {
  private static final String LOCK_STORE_NAME = "lockstore";

  /**
   * 执行各种方法以检查不同的 API。
   *
   * @param args 参数
   * @throws Exception 抛出异常
   */
  public static void main(String[] args) throws Exception {
    try (DaprPreviewClient client = (new DaprClientBuilder()).buildPreviewClient()) {
      System.out.println("Using preview client...");
      tryLock(client);
      unlock(client);
    }
  }

  /**
   * 尝试获取锁。
   *
   * @param client DaprPreviewClient 对象
   */
  public static void tryLock(DaprPreviewClient client) {
    System.out.println("*******尝试获取一个空闲的分布式锁********");
    try {
      LockRequest lockRequest = new LockRequest(LOCK_STORE_NAME, "resouce1", "owner1", 5);
      Mono<Boolean> result = client.tryLock(lockRequest);
      System.out.println("Lock result -> " + (Boolean.TRUE.equals(result.block()) ? "SUCCESS" : "FAIL"));
    } catch (Exception ex) {
      System.out.println(ex.getMessage());
    }
  }

  /**
   * 解锁。
   *
   * @param client DaprPreviewClient 对象
   */
  public static void unlock(DaprPreviewClient client) {
    System.out.println("*******解锁一个分布式锁********");
    try {
      UnlockRequest unlockRequest = new UnlockRequest(LOCK_STORE_NAME, "resouce1", "owner1");
      Mono<UnlockResponseStatus> result = client.unlock(unlockRequest);
      System.out.println("Unlock result ->" + result.block().name());
    } catch (Exception ex) {
      System.out.println(ex.getMessage());
    }
  }
}

工作流

package io.dapr.examples.workflows;

import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * 有关设置说明,请参阅 README。
 */
public class DemoWorkflowClient {

  /**
   * 主方法。
   *
   * @param args 输入参数(未使用)。
   * @throws InterruptedException 如果程序被中断。
   */
  public static void main(String[] args) throws InterruptedException {
    DaprWorkflowClient client = new DaprWorkflowClient();

    try (client) {
      String separatorStr = "*******";
      System.out.println(separatorStr);
      String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class, "input data");
      System.out.printf("Started new workflow instance with random ID: %s%n", instanceId);

      System.out.println(separatorStr);
      System.out.println("**GetInstanceMetadata:Running Workflow**");
      WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true);
      System.out.printf("Result: %s%n", workflowMetadata);

      System.out.println(separatorStr);
      System.out.println("**WaitForInstanceStart**");
      try {
        WorkflowInstanceStatus waitForInstanceStartResult =
            client.waitForInstanceStart(instanceId, Duration.ofSeconds(60), true);
        System.out.printf("Result: %s%n", waitForInstanceStartResult);
      } catch (TimeoutException ex) {
        System.out.printf("waitForInstanceStart has an exception:%s%n", ex);
      }

      System.out.println(separatorStr);
      System.out.println("**SendExternalMessage**");
      client.raiseEvent(instanceId, "TestEvent", "TestEventPayload");

      System.out.println(separatorStr);
      System.out.println("** Registering parallel Events to be captured by allOf(t1,t2,t3) **");
      client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload");
      client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload");
      client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload");
      System.out.printf("Events raised for workflow with instanceId: %s\n", instanceId);

      System.out.println(separatorStr);
      System.out.println("** Registering Event to be captured by anyOf(t1,t2,t3) **");
      client.raiseEvent(instanceId, "e2", "event 2 Payload");
      System.out.printf("Event raised for workflow with instanceId: %s\n", instanceId);


      System.out.println(separatorStr);
      System.out.println("**WaitForInstanceCompletion**");
      try {
        WorkflowInstanceStatus waitForInstanceCompletionResult =
            client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true);
        System.out.printf("Result: %s%n", waitForInstanceCompletionResult);
      } catch (TimeoutException ex) {
        System.out.printf("waitForInstanceCompletion has an exception:%s%n", ex);
      }

      System.out.println(separatorStr);
      System.out.println("**purgeInstance**");
      boolean purgeResult = client.purgeInstance(instanceId);
      System.out.printf("purgeResult: %s%n", purgeResult);

      System.out.println(separatorStr);
      System.out.println("**raiseEvent**");

      String eventInstanceId = client.scheduleNewWorkflow(DemoWorkflow.class);
      System.out.printf("Started new workflow instance with random ID: %s%n", eventInstanceId);
      client.raiseEvent(eventInstanceId, "TestException", null);
      System.out.printf("Event raised for workflow with instanceId: %s\n", eventInstanceId);

      System.out.println(separatorStr);
      String instanceToTerminateId = "terminateMe";
      client.scheduleNewWorkflow(DemoWorkflow.class, null, instanceToTerminateId);
      System.out.printf("Started new workflow instance with specified ID: %s%n", instanceToTerminateId);

      TimeUnit.SECONDS.sleep(5);
      System.out.println("Terminate this workflow instance manually before the timeout is reached");
      client.terminateWorkflow(instanceToTerminateId, null);
      System.out.println(separatorStr);

      String restartingInstanceId = "restarting";
      client.scheduleNewWorkflow(DemoWorkflow.class, null, restartingInstanceId);
      System.out.printf("Started new  workflow instance with ID: %s%n", restartingInstanceId);
      System.out.println("Sleeping 30 seconds to restart the workflow");
      TimeUnit.SECONDS.sleep(30);

      System.out.println("**SendExternalMessage: RestartEvent**");
      client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload");

      System.out.println("Sleeping 30 seconds to terminate the eternal workflow");
      TimeUnit.SECONDS.sleep(30);
      client.terminateWorkflow(restartingInstanceId, null);
    }

    System.out.println("Exiting DemoWorkflowClient.");
    System.exit(0);
  }
}

Sidecar API

等待 sidecar

DaprClient 还提供了一个辅助方法来等待 sidecar 变得健康(仅限组件)。使用此方法时,请确保指定超时时间(以毫秒为单位)并使用 block() 来等待反应操作的结果。

// 在尝试使用 Dapr 组件之前,等待 Dapr sidecar 报告健康。
try (DaprClient client = new DaprClientBuilder().build()) {
  System.out.println("Waiting for Dapr sidecar ...");
  client.waitForSidecar(10000).block(); // 指定超时时间(以毫秒为单位)
  System.out.println("Dapr sidecar is ready.");
  ...
}

// 在此处执行 Dapr 组件操作,例如获取秘密或保存状态。

关闭 sidecar

try (DaprClient client = new DaprClientBuilder().build()) {
  logger.info("Sending shutdown request.");
  client.shutdown().block();
  logger.info("Ensuring dapr has stopped.");
  ...
}

了解更多关于 Dapr Java SDK 可用于添加到您的 Java 应用程序的包

相关链接

3 - 开始使用 Dapr 和 Spring Boot

如何开始使用 Dapr 和 Spring Boot

通过将 Dapr 和 Spring Boot 结合使用,我们可以创建不依赖于特定基础设施的 Java 应用程序,这些应用程序可以部署在不同的环境中,并支持多种本地和云服务提供商。

首先,我们将从一个简单的集成开始,涵盖 DaprClientTestcontainers 的集成,然后利用 Spring 和 Spring Boot 的机制及编程模型来使用 Dapr API。这有助于团队消除连接到特定环境基础设施(如数据库、键值存储、消息代理、配置/密钥存储等)所需的客户端和驱动程序等依赖项。

将 Dapr 和 Spring Boot 集成添加到您的项目中

如果您已经有一个 Spring Boot 应用程序(Spring Boot 3.x+),可以直接将以下依赖项添加到您的项目中:

	<dependency>
        <groupId>io.dapr.spring</groupId>
		<artifactId>dapr-spring-boot-starter</artifactId>
		<version>0.13.1</version>
	</dependency>
	<dependency>
		<groupId>io.dapr.spring</groupId>
		<artifactId>dapr-spring-boot-starter-test</artifactId>
		<version>0.13.1</version>
		<scope>test</scope>
	</dependency>

通过添加这些依赖项,您可以:

  • 自动装配一个 DaprClient 以在您的应用程序中使用
  • 使用 Spring Data 和 Messaging 的抽象及编程模型,这些模型在底层使用 Dapr API
  • 通过依赖 Testcontainers 来引导 Dapr 控制平面服务和默认组件,从而改善您的开发流程

一旦这些依赖项在您的应用程序中,您可以依赖 Spring Boot 自动配置来自动装配一个 DaprClient 实例:

@Autowired
private DaprClient daprClient;

这将连接到默认的 Dapr gRPC 端点 localhost:50001,需要您在应用程序外启动 Dapr。

您可以在应用程序中的任何地方使用 DaprClient 与 Dapr API 交互,例如在 REST 端点内部:

@RestController
public class DemoRestController {
  @Autowired
  private DaprClient daprClient;

  @PostMapping("/store")
  public void storeOrder(@RequestBody Order order){
    daprClient.saveState("kvstore", order.orderId(), order).block();
  }
}

record Order(String orderId, Integer amount){}

如果您想避免在 Spring Boot 应用程序外管理 Dapr,可以依赖 Testcontainers 来在开发过程中引导 Dapr。为此,我们可以创建一个测试配置,使用 Testcontainers 来引导我们需要的所有内容,以使用 Dapr API 开发我们的应用程序。

通过使用 Testcontainers 和 Dapr 的集成,我们可以让 @TestConfiguration 为我们的应用程序引导 Dapr。注意,在此示例中,我们配置了一个名为 kvstore 的 Statestore 组件,该组件连接到由 Testcontainers 引导的 PostgreSQL 实例。

@TestConfiguration(proxyBeanMethods = false)
public class DaprTestContainersConfig {
  @Bean
  @ServiceConnection
  public DaprContainer daprContainer(Network daprNetwork, PostgreSQLContainer<?> postgreSQLContainer){
    
    return new DaprContainer("daprio/daprd:1.14.1")
            .withAppName("producer-app")
            .withNetwork(daprNetwork)
            .withComponent(new Component("kvstore", "state.postgresql", "v1", STATE_STORE_PROPERTIES))
            .withComponent(new Component("kvbinding", "bindings.postgresql", "v1", BINDING_PROPERTIES))
            .dependsOn(postgreSQLContainer);
  }
}

在测试类路径中,您可以添加一个新的 Spring Boot 应用程序,使用此配置进行测试:

@SpringBootApplication
public class TestProducerApplication {

  public static void main(String[] args) {

    SpringApplication
            .from(ProducerApplication::main)
            .with(DaprTestContainersConfig.class)
            .run(args);
  }
  
}

现在您可以启动您的应用程序:

mvn spring-boot:test-run

运行此命令将启动应用程序,使用提供的测试配置,其中包括 Testcontainers 和 Dapr 集成。在日志中,您应该能够看到 daprdplacement 服务容器已为您的应用程序启动。

除了之前的配置(DaprTestContainersConfig),您的测试不应该测试 Dapr 本身,只需测试您的应用程序暴露的 REST 端点。

利用 Spring 和 Spring Boot 编程模型与 Dapr

Java SDK 允许您与所有 Dapr 构建块 接口。但如果您想利用 Spring 和 Spring Boot 编程模型,可以使用 dapr-spring-boot-starter 集成。这包括 Spring Data 的实现(KeyValueTemplateCrudRepository)以及用于生产和消费消息的 DaprMessagingTemplate(类似于 Spring KafkaSpring PulsarSpring AMQP for RabbitMQ)。

使用 Spring Data CrudRepositoryKeyValueTemplate

您可以使用依赖于 Dapr 实现的知名 Spring Data 构造。使用 Dapr,您无需添加任何与基础设施相关的驱动程序或客户端,使您的 Spring 应用程序更轻量化,并与其运行的环境解耦。

在底层,这些实现使用 Dapr Statestore 和 Binding API。

配置参数

使用 Spring Data 抽象,您可以配置 Dapr 将用于连接到可用基础设施的 statestore 和 bindings。这可以通过设置以下属性来完成:

dapr.statestore.name=kvstore
dapr.statestore.binding=kvbinding

然后您可以像这样 @Autowire 一个 KeyValueTemplateCrudRepository

@RestController
@EnableDaprRepositories
public class OrdersRestController {
  @Autowired
  private OrderRepository repository;
  
  @PostMapping("/orders")
  public void storeOrder(@RequestBody Order order){
    repository.save(order);
  }

  @GetMapping("/orders")
  public Iterable<Order> getAll(){
    return repository.findAll();
  }
}

其中 OrderRepository 在一个扩展 Spring Data CrudRepository 接口的接口中定义:

public interface OrderRepository extends CrudRepository<Order, String> {}

注意,@EnableDaprRepositories 注解完成了在 CrudRespository 接口下连接 Dapr API 的所有工作。因为 Dapr 允许用户从同一个应用程序与不同的 StateStores 交互,作为用户,您需要提供以下 bean 作为 Spring Boot @Configuration

@Configuration
@EnableConfigurationProperties({DaprStateStoreProperties.class})
public class ProducerAppConfiguration {
  
  @Bean
  public KeyValueAdapterResolver keyValueAdapterResolver(DaprClient daprClient, ObjectMapper mapper, DaprStateStoreProperties daprStatestoreProperties) {
    String storeName = daprStatestoreProperties.getName();
    String bindingName = daprStatestoreProperties.getBinding();

    return new DaprKeyValueAdapterResolver(daprClient, mapper, storeName, bindingName);
  }

  @Bean
  public DaprKeyValueTemplate daprKeyValueTemplate(KeyValueAdapterResolver keyValueAdapterResolver) {
    return new DaprKeyValueTemplate(keyValueAdapterResolver);
  }
}

使用 Spring Messaging 生产和消费事件

类似于 Spring Kafka、Spring Pulsar 和 Spring AMQP,您可以使用 DaprMessagingTemplate 将消息发布到配置的基础设施。要消费消息,您可以使用 @Topic 注解(即将重命名为 @DaprListener)。

要发布事件/消息,您可以在 Spring 应用程序中 @Autowired DaprMessagingTemplate。在此示例中,我们将发布 Order 事件,并将消息发送到名为 topic 的主题。

@Autowired
private DaprMessagingTemplate<Order> messagingTemplate;

@PostMapping("/orders")
public void storeOrder(@RequestBody Order order){
  repository.save(order);
  messagingTemplate.send("topic", order);
}

CrudRepository 类似,我们需要指定要使用哪个 PubSub 代理来发布和消费我们的消息。

dapr.pubsub.name=pubsub

因为使用 Dapr,您可以连接到多个 PubSub 代理,您需要提供以下 bean 以让 Dapr 知道您的 DaprMessagingTemplate 将使用哪个 PubSub 代理:

@Bean
public DaprMessagingTemplate<Order> messagingTemplate(DaprClient daprClient,
                                                             DaprPubSubProperties daprPubSubProperties) {
  return new DaprMessagingTemplate<>(daprClient, daprPubSubProperties.getName());
}

最后,因为 Dapr PubSub 需要您的应用程序和 Dapr 之间的双向连接,您需要使用一些参数扩展您的 Testcontainers 配置:

@Bean
@ServiceConnection
public DaprContainer daprContainer(Network daprNetwork, PostgreSQLContainer<?> postgreSQLContainer, RabbitMQContainer rabbitMQContainer){
    
    return new DaprContainer("daprio/daprd:1.14.1")
            .withAppName("producer-app")
            .withNetwork(daprNetwork)
            .withComponent(new Component("kvstore", "state.postgresql", "v1", STATE_STORE_PROPERTIES))
            .withComponent(new Component("kvbinding", "bindings.postgresql", "v1", BINDING_PROPERTIES))
            .withComponent(new Component("pubsub", "pubsub.rabbitmq", "v1", rabbitMqProperties))
            .withAppPort(8080)
            .withAppChannelAddress("host.testcontainers.internal")
            .dependsOn(rabbitMQContainer)
            .dependsOn(postgreSQLContainer);
}

现在,在 Dapr 配置中,我们包含了一个 pubsub 组件,该组件将连接到由 Testcontainers 启动的 RabbitMQ 实例。我们还设置了两个重要参数 .withAppPort(8080).withAppChannelAddress("host.testcontainers.internal"),这允许 Dapr 在代理中发布消息时联系回应用程序。

要监听事件/消息,您需要在应用程序中暴露一个端点,该端点将负责接收消息。如果您暴露一个 REST 端点,可以使用 @Topic 注解让 Dapr 知道它需要将事件/消息转发到哪里:

@PostMapping("subscribe")
@Topic(pubsubName = "pubsub", name = "topic")
public void subscribe(@RequestBody CloudEvent<Order> cloudEvent){
    events.add(cloudEvent);
}

在引导您的应用程序时,Dapr 将注册订阅,以便将消息转发到您的应用程序暴露的 subscribe 端点。

如果您正在为这些订阅者编写测试,您需要确保 Testcontainers 知道您的应用程序将在端口 8080 上运行,以便 Testcontainers 启动的容器知道您的应用程序在哪里:

@BeforeAll
public static void setup(){
  org.testcontainers.Testcontainers.exposeHostPorts(8080);
}

您可以在此处查看并运行完整示例源代码

下一步

了解更多关于 Dapr Java SDK 可用于添加到您的 Java 应用程序的包的信息。

相关链接