This is the multi-page printable view of this section. Click here to print.

Return to the regular view of this page.

Workflow

Orchestrate logic across various microservices

1 - Workflow overview

Overview of Dapr Workflow

Dapr workflow makes it easy for developers to write business logic and integrations in a reliable way. Since Dapr workflows are stateful, they support long-running and fault-tolerant applications, ideal for orchestrating microservices. Dapr workflow works seamlessly with other Dapr building blocks, such as service invocation, pub/sub, state management, and bindings.

The durable, resilient Dapr Workflow capability:

  • Offers a built-in workflow runtime for driving Dapr Workflow execution.
  • Provides SDKs for authoring workflows in code, using any language.
  • Provides HTTP and gRPC APIs for managing workflows (start, query, pause/resume, raise event, terminate, purge).
Diagram showing basics of Dapr Workflow

Some example scenarios that Dapr Workflow can perform are:

  • Order processing involving orchestration between inventory management, payment systems, and shipping services.
  • HR onboarding workflows coordinating tasks across multiple departments and participants.
  • Orchestrating the roll-out of digital menu updates in a national restaurant chain.
  • Image processing workflows involving API-based classification and storage.

Features

Workflows and activities

With Dapr Workflow, you can write activities and then orchestrate those activities in a workflow. Workflow activities are:

  • The basic unit of work in a workflow
  • Used for calling other (Dapr) services, interacting with state stores, and pub/sub brokers.
  • Used for calling external third party services.

Learn more about workflow activities.

Child workflows

In addition to activities, you can write workflows to schedule other workflows as child workflows. A child workflow has its own instance ID, history, and status that is independent of the parent workflow that started it, except for the fact that terminating the parent workflow terminates all of the child workflows created by it. Child workflow also supports automatic retry policies.

Learn more about child workflows.

Timers and reminders

Same as Dapr actors, you can schedule reminder-like durable delays for any time range.

Learn more about workflow timers and reminders

Workflow HTTP calls to manage a workflow

When you create an application with workflow code and run it with Dapr, you can call specific workflows that reside in the application. Each individual workflow can be:

  • Started or terminated through a POST request
  • Triggered to deliver a named event through a POST request
  • Paused and then resumed through a POST request
  • Purged from your state store through a POST request
  • Queried for workflow status through a GET request

Learn more about how manage a workflow using HTTP calls.

Workflow patterns

Dapr Workflow simplifies complex, stateful coordination requirements in microservice architectures. The following sections describe several application patterns that can benefit from Dapr Workflow.

Learn more about different types of workflow patterns

Workflow SDKs

The Dapr Workflow authoring SDKs are language-specific SDKs that contain types and functions to implement workflow logic. The workflow logic lives in your application and is orchestrated by the Dapr Workflow engine running in the Dapr sidecar via a gRPC stream.

Supported SDKs

You can use the following SDKs to author a workflow.

Language stackPackage
Pythondapr-ext-workflow
JavaScriptDaprWorkflowClient
.NETDapr.Workflow
Javaio.dapr.workflows
Goworkflow

Try out workflows

Quickstarts and tutorials

Want to put workflows to the test? Walk through the following quickstart and tutorials to see workflows in action:

Quickstart/tutorialDescription
Workflow quickstartRun a workflow application with four workflow activities to see Dapr Workflow in action
Workflow Python SDK exampleLearn how to create a Dapr Workflow and invoke it using the Python dapr-ext-workflow package.
Workflow JavaScript SDK exampleLearn how to create a Dapr Workflow and invoke it using the JavaScript SDK.
Workflow .NET SDK exampleLearn how to create a Dapr Workflow and invoke it using ASP.NET Core web APIs.
Workflow Java SDK exampleLearn how to create a Dapr Workflow and invoke it using the Java io.dapr.workflows package.
Workflow Go SDK exampleLearn how to create a Dapr Workflow and invoke it using the Go workflow package.

Start using workflows directly in your app

Want to skip the quickstarts? Not a problem. You can try out the workflow building block directly in your application. After Dapr is installed, you can begin using workflows, starting with how to author a workflow.

Limitations

  • State stores: Due to underlying limitations in some database choices, more commonly NoSQL databases, you might run into limitations around storing internal states. For example, CosmosDB has a maximum single operation item limit of only 100 states in a single request.

Watch the demo

Watch this video for an overview on Dapr Workflow:

Next steps

Workflow features and concepts >>

2 - Features and concepts

Learn more about the Dapr Workflow features and concepts

Now that you’ve learned about the workflow building block at a high level, let’s deep dive into the features and concepts included with the Dapr Workflow engine and SDKs. Dapr Workflow exposes several core features and concepts which are common across all supported languages.

Workflows

Dapr Workflows are functions you write that define a series of tasks to be executed in a particular order. The Dapr Workflow engine takes care of scheduling and execution of the tasks, including managing failures and retries. If the app hosting your workflows is scaled out across multiple machines, the workflow engine load balances the execution of workflows and their tasks across multiple machines.

There are several different kinds of tasks that a workflow can schedule, including

  • Activities for executing custom logic
  • Durable timers for putting the workflow to sleep for arbitrary lengths of time
  • Child workflows for breaking larger workflows into smaller pieces
  • External event waiters for blocking workflows until they receive external event signals. These tasks are described in more details in their corresponding sections.

Workflow identity

Each workflow you define has a type name, and individual executions of a workflow require a unique instance ID. Workflow instance IDs can be generated by your app code, which is useful when workflows correspond to business entities like documents or jobs, or can be auto-generated UUIDs. A workflow’s instance ID is useful for debugging and also for managing workflows using the Workflow APIs.

Only one workflow instance with a given ID can exist at any given time. However, if a workflow instance completes or fails, its ID can be reused by a new workflow instance. Note, however, that the new workflow instance effectively replaces the old one in the configured state store.

Workflow replay

Dapr Workflows maintain their execution state by using a technique known as event sourcing. Instead of storing the current state of a workflow as a snapshot, the workflow engine manages an append-only log of history events that describe the various steps that a workflow has taken. When using the workflow SDK, these history events are stored automatically whenever the workflow “awaits” for the result of a scheduled task.

When a workflow “awaits” a scheduled task, it unloads itself from memory until the task completes. Once the task completes, the workflow engine schedules the workflow function to run again. This second workflow function execution is known as a replay.

When a workflow function is replayed, it runs again from the beginning. However, when it encounters a task that already completed, instead of scheduling that task again, the workflow engine:

  1. Returns the stored result of the completed task to the workflow.
  2. Continues execution until the next “await” point.

This “replay” behavior continues until the workflow function completes or fails with an error.

Using this replay technique, a workflow is able to resume execution from any “await” point as if it had never been unloaded from memory. Even the values of local variables from previous runs can be restored without the workflow engine knowing anything about what data they stored. This ability to restore state makes Dapr Workflows durable and fault tolerant.

Infinite loops and eternal workflows

As discussed in the workflow replay section, workflows maintain a write-only event-sourced history log of all its operations. To avoid runaway resource usage, workflows must limit the number of operations they schedule. For example, ensure your workflow doesn’t:

  • Use infinite loops in its implementation
  • Schedule thousands of tasks.

You can use the following two techniques to write workflows that may need to schedule extreme numbers of tasks:

  1. Use the continue-as-new API: Each workflow SDK exposes a continue-as-new API that workflows can invoke to restart themselves with a new input and history. The continue-as-new API is especially ideal for implementing “eternal workflows”, like monitoring agents, which would otherwise be implemented using a while (true)-like construct. Using continue-as-new is a great way to keep the workflow history size small.

    The continue-as-new API truncates the existing history, replacing it with a new history.

  2. Use child workflows: Each workflow SDK exposes an API for creating child workflows. A child workflow behaves like any other workflow, except that it’s scheduled by a parent workflow. Child workflows have:

    • Their own history
    • The benefit of distributing workflow function execution across multiple machines.

    If a workflow needs to schedule thousands of tasks or more, it’s recommended that those tasks be distributed across child workflows so that no single workflow’s history size grows too large.

Updating workflow code

Because workflows are long-running and durable, updating workflow code must be done with extreme care. As discussed in the workflow determinism limitation section, workflow code must be deterministic. Updates to workflow code must preserve this determinism if there are any non-completed workflow instances in the system. Otherwise, updates to workflow code can result in runtime failures the next time those workflows execute.

See known limitations

Workflow activities

Workflow activities are the basic unit of work in a workflow and are the tasks that get orchestrated in the business process. For example, you might create a workflow to process an order. The tasks may involve checking the inventory, charging the customer, and creating a shipment. Each task would be a separate activity. These activities may be executed serially, in parallel, or some combination of both.

Unlike workflows, activities aren’t restricted in the type of work you can do in them. Activities are frequently used to make network calls or run CPU intensive operations. An activity can also return data back to the workflow.

The Dapr Workflow engine guarantees that each called activity is executed at least once as part of a workflow’s execution. Because activities only guarantee at-least-once execution, it’s recommended that activity logic be implemented as idempotent whenever possible.

Child workflows

In addition to activities, workflows can schedule other workflows as child workflows. A child workflow has its own instance ID, history, and status that is independent of the parent workflow that started it.

Child workflows have many benefits:

  • You can split large workflows into a series of smaller child workflows, making your code more maintainable.
  • You can distribute workflow logic across multiple compute nodes concurrently, which is useful if your workflow logic otherwise needs to coordinate a lot of tasks.
  • You can reduce memory usage and CPU overhead by keeping the history of parent workflow smaller.

The return value of a child workflow is its output. If a child workflow fails with an exception, then that exception is surfaced to the parent workflow, just like it is when an activity task fails with an exception. Child workflows also support automatic retry policies.

Terminating a parent workflow terminates all of the child workflows created by the workflow instance. See the terminate workflow api for more information.

Durable timers

Dapr Workflows allow you to schedule reminder-like durable delays for any time range, including minutes, days, or even years. These durable timers can be scheduled by workflows to implement simple delays or to set up ad-hoc timeouts on other async tasks. More specifically, a durable timer can be set to trigger on a particular date or after a specified duration. There are no limits to the maximum duration of durable timers, which are internally backed by internal actor reminders. For example, a workflow that tracks a 30-day free subscription to a service could be implemented using a durable timer that fires 30-days after the workflow is created. Workflows can be safely unloaded from memory while waiting for a durable timer to fire.

Retry policies

Workflows support durable retry policies for activities and child workflows. Workflow retry policies are separate and distinct from Dapr resiliency policies in the following ways.

  • Workflow retry policies are configured by the workflow author in code, whereas Dapr Resiliency policies are configured by the application operator in YAML.
  • Workflow retry policies are durable and maintain their state across application restarts, whereas Dapr Resiliency policies are not durable and must be re-applied after application restarts.
  • Workflow retry policies are triggered by unhandled errors/exceptions in activities and child workflows, whereas Dapr Resiliency policies are triggered by operation timeouts and connectivity faults.

Retries are internally implemented using durable timers. This means that workflows can be safely unloaded from memory while waiting for a retry to fire, conserving system resources. This also means that delays between retries can be arbitrarily long, including minutes, hours, or even days.

It’s possible to use both workflow retry policies and Dapr Resiliency policies together. For example, if a workflow activity uses a Dapr client to invoke a service, the Dapr client uses the configured resiliency policy. See Quickstart: Service-to-service resiliency for more information with an example. However, if the activity itself fails for any reason, including exhausting the retries on the resiliency policy, then the workflow’s resiliency policy kicks in.

Because workflow retry policies are configured in code, the exact developer experience may vary depending on the version of the workflow SDK. In general, workflow retry policies can be configured with the following parameters.

ParameterDescription
Maximum number of attemptsThe maximum number of times to execute the activity or child workflow.
First retry intervalThe amount of time to wait before the first retry.
Backoff coefficientThe coefficient used to determine the rate of increase of back-off. For example a coefficient of 2 doubles the wait of each subsequent retry.
Maximum retry intervalThe maximum amount of time to wait before each subsequent retry.
Retry timeoutThe overall timeout for retries, regardless of any configured max number of attempts.

External events

Sometimes workflows will need to wait for events that are raised by external systems. For example, an approval workflow may require a human to explicitly approve an order request within an order processing workflow if the total cost exceeds some threshold. Another example is a trivia game orchestration workflow that pauses while waiting for all participants to submit their answers to trivia questions. These mid-execution inputs are referred to as external events.

External events have a name and a payload and are delivered to a single workflow instance. Workflows can create “wait for external event” tasks that subscribe to external events and await those tasks to block execution until the event is received. The workflow can then read the payload of these events and make decisions about which next steps to take. External events can be processed serially or in parallel. External events can be raised by other workflows or by workflow code.

Workflows can also wait for multiple external event signals of the same name, in which case they are dispatched to the corresponding workflow tasks in a first-in, first-out (FIFO) manner. If a workflow receives an external event signal but has not yet created a “wait for external event” task, the event will be saved into the workflow’s history and consumed immediately after the workflow requests the event.

Learn more about external system interaction.

Purging

Workflow state can be purged from a state store, purging all its history and removing all metadata related to a specific workflow instance. The purge capability is used for workflows that have run to a COMPLETED, FAILED, or TERMINATED state.

Learn more in the workflow API reference guide.

Limitations

Workflow determinism and code restraints

To take advantage of the workflow replay technique, your workflow code needs to be deterministic. For your workflow code to be deterministic, you may need to work around some limitations.

Workflow functions must call deterministic APIs.

APIs that generate random numbers, random UUIDs, or the current date are non-deterministic. To work around this limitation, you can:

  • Use these APIs in activity functions, or
  • (Preferred) Use built-in equivalent APIs offered by the SDK. For example, each authoring SDK provides an API for retrieving the current time in a deterministic manner.

For example, instead of this:

// DON'T DO THIS!
DateTime currentTime = DateTime.UtcNow;
Guid newIdentifier = Guid.NewGuid();
string randomString = GetRandomString();
// DON'T DO THIS!
Instant currentTime = Instant.now();
UUID newIdentifier = UUID.randomUUID();
String randomString = getRandomString();
// DON'T DO THIS!
const currentTime = new Date();
const newIdentifier = uuidv4();
const randomString = getRandomString();
// DON'T DO THIS!
const currentTime = time.Now()

Do this:

// Do this!!
DateTime currentTime = context.CurrentUtcDateTime;
Guid newIdentifier = context.NewGuid();
string randomString = await context.CallActivityAsync<string>(nameof("GetRandomString")); //Use "nameof" to prevent specifying an activity name that does not exist in your application 
// Do this!!
Instant currentTime = context.getCurrentInstant();
Guid newIdentifier = context.newGuid();
String randomString = context.callActivity(GetRandomString.class.getName(), String.class).await();
// Do this!!
const currentTime = context.getCurrentUtcDateTime();
const randomString = yield context.callActivity(getRandomString);
const currentTime = ctx.CurrentUTCDateTime()

Workflow functions must only interact indirectly with external state.

External data includes any data that isn’t stored in the workflow state. Workflows must not interact with global variables, environment variables, the file system, or make network calls.

Instead, workflows should interact with external state indirectly using workflow inputs, activity tasks, and through external event handling.

For example, instead of this:

// DON'T DO THIS!
string configuration = Environment.GetEnvironmentVariable("MY_CONFIGURATION")!;
string data = await new HttpClient().GetStringAsync("https://example.com/api/data");
// DON'T DO THIS!
String configuration = System.getenv("MY_CONFIGURATION");

HttpRequest request = HttpRequest.newBuilder().uri(new URI("https://postman-echo.com/post")).GET().build();
HttpResponse<String> response = HttpClient.newBuilder().build().send(request, HttpResponse.BodyHandlers.ofString());
// DON'T DO THIS!
// Accessing an Environment Variable (Node.js)
const configuration = process.env.MY_CONFIGURATION;

fetch('https://postman-echo.com/get')
  .then(response => response.text())
  .then(data => {
    console.log(data);
  })
  .catch(error => {
    console.error('Error:', error);
  });
// DON'T DO THIS!
resp, err := http.Get("http://example.com/api/data")

Do this:

// Do this!!
string configuration = workflowInput.Configuration; // imaginary workflow input argument
string data = await context.CallActivityAsync<string>(nameof("MakeHttpCall"), "https://example.com/api/data");
// Do this!!
String configuration = ctx.getInput(InputType.class).getConfiguration(); // imaginary workflow input argument
String data = ctx.callActivity(MakeHttpCall.class, "https://example.com/api/data", String.class).await();
// Do this!!
const configuration = workflowInput.getConfiguration(); // imaginary workflow input argument
const data = yield ctx.callActivity(makeHttpCall, "https://example.com/api/data");
// Do this!!
err := ctx.CallActivity(MakeHttpCallActivity, workflow.ActivityInput("https://example.com/api/data")).Await(&output)

Workflow functions must execute only on the workflow dispatch thread.

The implementation of each language SDK requires that all workflow function operations operate on the same thread (goroutine, etc.) that the function was scheduled on. Workflow functions must never:

  • Schedule background threads, or
  • Use APIs that schedule a callback function to run on another thread.

Failure to follow this rule could result in undefined behavior. Any background processing should instead be delegated to activity tasks, which can be scheduled to run serially or concurrently.

For example, instead of this:

// DON'T DO THIS!
Task t = Task.Run(() => context.CallActivityAsync("DoSomething"));
await context.CreateTimer(5000).ConfigureAwait(false);
// DON'T DO THIS!
new Thread(() -> {
    ctx.callActivity(DoSomethingActivity.class.getName()).await();
}).start();
ctx.createTimer(Duration.ofSeconds(5)).await();

Don’t declare JavaScript workflow as async. The Node.js runtime doesn’t guarantee that asynchronous functions are deterministic.

// DON'T DO THIS!
go func() {
  err := ctx.CallActivity(DoSomething).Await(nil)
}()
err := ctx.CreateTimer(time.Second).Await(nil)

Do this:

// Do this!!
Task t = context.CallActivityAsync(nameof("DoSomething"));
await context.CreateTimer(5000).ConfigureAwait(true);
// Do this!!
ctx.callActivity(DoSomethingActivity.class.getName()).await();
ctx.createTimer(Duration.ofSeconds(5)).await();

Since the Node.js runtime doesn’t guarantee that asynchronous functions are deterministic, always declare JavaScript workflow as synchronous generator functions.

// Do this!
task := ctx.CallActivity(DoSomething)
task.Await(nil)

Updating workflow code

Make sure updates you make to the workflow code maintain its determinism. A couple examples of code updates that can break workflow determinism:

  • Changing workflow function signatures: Changing the name, input, or output of a workflow or activity function is considered a breaking change and must be avoided.

  • Changing the number or order of workflow tasks: Changing the number or order of workflow tasks causes a workflow instance’s history to no longer match the code and may result in runtime errors or other unexpected behavior.

To work around these constraints:

  • Instead of updating existing workflow code, leave the existing workflow code as-is and create new workflow definitions that include the updates.
  • Upstream code that creates workflows should only be updated to create instances of the new workflows.
  • Leave the old code around to ensure that existing workflow instances can continue to run without interruption. If and when it’s known that all instances of the old workflow logic have completed, then the old workflow code can be safely deleted.

Next steps

Workflow patterns >>

3 - Workflow patterns

Write different types of workflow patterns

Dapr Workflows simplify complex, stateful coordination requirements in microservice architectures. The following sections describe several application patterns that can benefit from Dapr Workflows.

Task chaining

In the task chaining pattern, multiple steps in a workflow are run in succession, and the output of one step may be passed as the input to the next step. Task chaining workflows typically involve creating a sequence of operations that need to be performed on some data, such as filtering, transforming, and reducing.

Diagram showing how the task chaining workflow pattern works

In some cases, the steps of the workflow may need to be orchestrated across multiple microservices. For increased reliability and scalability, you’re also likely to use queues to trigger the various steps.

While the pattern is simple, there are many complexities hidden in the implementation. For example:

  • What happens if one of the microservices are unavailable for an extended period of time?
  • Can failed steps be automatically retried?
  • If not, how do you facilitate the rollback of previously completed steps, if applicable?
  • Implementation details aside, is there a way to visualize the workflow so that other engineers can understand what it does and how it works?

Dapr Workflow solves these complexities by allowing you to implement the task chaining pattern concisely as a simple function in the programming language of your choice, as shown in the following example.

import dapr.ext.workflow as wf


def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
    try:
        result1 = yield ctx.call_activity(step1, input=wf_input)
        result2 = yield ctx.call_activity(step2, input=result1)
        result3 = yield ctx.call_activity(step3, input=result2)
    except Exception as e:
        yield ctx.call_activity(error_handler, input=str(e))
        raise
    return [result1, result2, result3]


def step1(ctx, activity_input):
    print(f'Step 1: Received input: {activity_input}.')
    # Do some work
    return activity_input + 1


def step2(ctx, activity_input):
    print(f'Step 2: Received input: {activity_input}.')
    # Do some work
    return activity_input * 2


def step3(ctx, activity_input):
    print(f'Step 3: Received input: {activity_input}.')
    # Do some work
    return activity_input ^ 2


def error_handler(ctx, error):
    print(f'Executing error handler: {error}.')
    # Apply some compensating work

Note Workflow retry policies will be available in a future version of the Python SDK.

import { DaprWorkflowClient, WorkflowActivityContext, WorkflowContext, WorkflowRuntime, TWorkflow } from "@dapr/dapr";

async function start() {
  // Update the gRPC client and worker to use a local address and port
  const daprHost = "localhost";
  const daprPort = "50001";
  const workflowClient = new DaprWorkflowClient({
    daprHost,
    daprPort,
  });
  const workflowRuntime = new WorkflowRuntime({
    daprHost,
    daprPort,
  });

  const hello = async (_: WorkflowActivityContext, name: string) => {
    return `Hello ${name}!`;
  };

  const sequence: TWorkflow = async function* (ctx: WorkflowContext): any {
    const cities: string[] = [];

    const result1 = yield ctx.callActivity(hello, "Tokyo");
    cities.push(result1);
    const result2 = yield ctx.callActivity(hello, "Seattle");
    cities.push(result2);
    const result3 = yield ctx.callActivity(hello, "London");
    cities.push(result3);

    return cities;
  };

  workflowRuntime.registerWorkflow(sequence).registerActivity(hello);

  // Wrap the worker startup in a try-catch block to handle any errors during startup
  try {
    await workflowRuntime.start();
    console.log("Workflow runtime started successfully");
  } catch (error) {
    console.error("Error starting workflow runtime:", error);
  }

  // Schedule a new orchestration
  try {
    const id = await workflowClient.scheduleNewWorkflow(sequence);
    console.log(`Orchestration scheduled with ID: ${id}`);

    // Wait for orchestration completion
    const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);

    console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
  } catch (error) {
    console.error("Error scheduling or waiting for orchestration:", error);
  }

  await workflowRuntime.stop();
  await workflowClient.stop();

  // stop the dapr side car
  process.exit(0);
}

start().catch((e) => {
  console.error(e);
  process.exit(1);
 # Apply custom compensation logic
});
// Expotential backoff retry policy that survives long outages
var retryOptions = new WorkflowTaskOptions
{
    RetryPolicy = new WorkflowRetryPolicy(
        firstRetryInterval: TimeSpan.FromMinutes(1),
        backoffCoefficient: 2.0,
        maxRetryInterval: TimeSpan.FromHours(1),
        maxNumberOfAttempts: 10),
};

try
{
    var result1 = await context.CallActivityAsync<string>("Step1", wfInput, retryOptions);
    var result2 = await context.CallActivityAsync<byte[]>("Step2", result1, retryOptions);
    var result3 = await context.CallActivityAsync<long[]>("Step3", result2, retryOptions);
    return string.Join(", ", result4);
}
catch (TaskFailedException) // Task failures are surfaced as TaskFailedException
{
    // Retries expired - apply custom compensation logic
    await context.CallActivityAsync<long[]>("MyCompensation", options: retryOptions);
    throw;
}

Note In the example above, "Step1", "Step2", "Step3", and "MyCompensation" represent workflow activities, which are functions in your code that actually implement the steps of the workflow. For brevity, these activity implementations are left out of this example.

public class ChainWorkflow extends Workflow {
    @Override
    public WorkflowStub create() {
        return ctx -> {
            StringBuilder sb = new StringBuilder();
            String wfInput = ctx.getInput(String.class);
            String result1 = ctx.callActivity("Step1", wfInput, String.class).await();
            String result2 = ctx.callActivity("Step2", result1, String.class).await();
            String result3 = ctx.callActivity("Step3", result2, String.class).await();
            String result = sb.append(result1).append(',').append(result2).append(',').append(result3).toString();
            ctx.complete(result);
        };
    }
}

    class Step1 implements WorkflowActivity {

        @Override
        public Object run(WorkflowActivityContext ctx) {
            Logger logger = LoggerFactory.getLogger(Step1.class);
            logger.info("Starting Activity: " + ctx.getName());
            // Do some work
            return null;
        }
    }

    class Step2 implements WorkflowActivity {

        @Override
        public Object run(WorkflowActivityContext ctx) {
            Logger logger = LoggerFactory.getLogger(Step2.class);
            logger.info("Starting Activity: " + ctx.getName());
            // Do some work
            return null;
        }
    }

    class Step3 implements WorkflowActivity {

        @Override
        public Object run(WorkflowActivityContext ctx) {
            Logger logger = LoggerFactory.getLogger(Step3.class);
            logger.info("Starting Activity: " + ctx.getName());
            // Do some work
            return null;
        }
    }
func TaskChainWorkflow(ctx *workflow.WorkflowContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return "", err
	}
	var result1 int
	if err := ctx.CallActivity(Step1, workflow.ActivityInput(input)).Await(&result1); err != nil {
		return nil, err
	}
	var result2 int
	if err := ctx.CallActivity(Step2, workflow.ActivityInput(input)).Await(&result2); err != nil {
		return nil, err
	}
	var result3 int
	if err := ctx.CallActivity(Step3, workflow.ActivityInput(input)).Await(&result3); err != nil {
		return nil, err
	}
	return []int{result1, result2, result3}, nil
}
func Step1(ctx workflow.ActivityContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return "", err
	}
	fmt.Printf("Step 1: Received input: %s", input)
	return input + 1, nil
}
func Step2(ctx workflow.ActivityContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return "", err
	}
	fmt.Printf("Step 2: Received input: %s", input)
	return input * 2, nil
}
func Step3(ctx workflow.ActivityContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return "", err
	}
	fmt.Printf("Step 3: Received input: %s", input)
	return int(math.Pow(float64(input), 2)), nil
}

As you can see, the workflow is expressed as a simple series of statements in the programming language of your choice. This allows any engineer in the organization to quickly understand the end-to-end flow without necessarily needing to understand the end-to-end system architecture.

Behind the scenes, the Dapr Workflow runtime:

  • Takes care of executing the workflow and ensuring that it runs to completion.
  • Saves progress automatically.
  • Automatically resumes the workflow from the last completed step if the workflow process itself fails for any reason.
  • Enables error handling to be expressed naturally in your target programming language, allowing you to implement compensation logic easily.
  • Provides built-in retry configuration primitives to simplify the process of configuring complex retry policies for individual steps in the workflow.

Fan-out/fan-in

In the fan-out/fan-in design pattern, you execute multiple tasks simultaneously across potentially multiple workers, wait for them to finish, and perform some aggregation on the result.

Diagram showing how the fan-out/fan-in workflow pattern works

In addition to the challenges mentioned in the previous pattern, there are several important questions to consider when implementing the fan-out/fan-in pattern manually:

  • How do you control the degree of parallelism?
  • How do you know when to trigger subsequent aggregation steps?
  • What if the number of parallel steps is dynamic?

Dapr Workflows provides a way to express the fan-out/fan-in pattern as a simple function, as shown in the following example:

import time
from typing import List
import dapr.ext.workflow as wf


def batch_processing_workflow(ctx: wf.DaprWorkflowContext, wf_input: int):
    # get a batch of N work items to process in parallel
    work_batch = yield ctx.call_activity(get_work_batch, input=wf_input)

    # schedule N parallel tasks to process the work items and wait for all to complete
    parallel_tasks = [ctx.call_activity(process_work_item, input=work_item) for work_item in work_batch]
    outputs = yield wf.when_all(parallel_tasks)

    # aggregate the results and send them to another activity
    total = sum(outputs)
    yield ctx.call_activity(process_results, input=total)


def get_work_batch(ctx, batch_size: int) -> List[int]:
    return [i + 1 for i in range(batch_size)]


def process_work_item(ctx, work_item: int) -> int:
    print(f'Processing work item: {work_item}.')
    time.sleep(5)
    result = work_item * 2
    print(f'Work item {work_item} processed. Result: {result}.')
    return result


def process_results(ctx, final_result: int):
    print(f'Final result: {final_result}.')
import {
  Task,
  DaprWorkflowClient,
  WorkflowActivityContext,
  WorkflowContext,
  WorkflowRuntime,
  TWorkflow,
} from "@dapr/dapr";

// Wrap the entire code in an immediately-invoked async function
async function start() {
  // Update the gRPC client and worker to use a local address and port
  const daprHost = "localhost";
  const daprPort = "50001";
  const workflowClient = new DaprWorkflowClient({
    daprHost,
    daprPort,
  });
  const workflowRuntime = new WorkflowRuntime({
    daprHost,
    daprPort,
  });

  function getRandomInt(min: number, max: number): number {
    return Math.floor(Math.random() * (max - min + 1)) + min;
  }

  async function getWorkItemsActivity(_: WorkflowActivityContext): Promise<string[]> {
    const count: number = getRandomInt(2, 10);
    console.log(`generating ${count} work items...`);

    const workItems: string[] = Array.from({ length: count }, (_, i) => `work item ${i}`);
    return workItems;
  }

  function sleep(ms: number): Promise<void> {
    return new Promise((resolve) => setTimeout(resolve, ms));
  }

  async function processWorkItemActivity(context: WorkflowActivityContext, item: string): Promise<number> {
    console.log(`processing work item: ${item}`);

    // Simulate some work that takes a variable amount of time
    const sleepTime = Math.random() * 5000;
    await sleep(sleepTime);

    // Return a result for the given work item, which is also a random number in this case
    // For more information about random numbers in workflow please check
    // https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-code-constraints?tabpane=csharp#random-numbers
    return Math.floor(Math.random() * 11);
  }

  const workflow: TWorkflow = async function* (ctx: WorkflowContext): any {
    const tasks: Task<any>[] = [];
    const workItems = yield ctx.callActivity(getWorkItemsActivity);
    for (const workItem of workItems) {
      tasks.push(ctx.callActivity(processWorkItemActivity, workItem));
    }
    const results: number[] = yield ctx.whenAll(tasks);
    const sum: number = results.reduce((accumulator, currentValue) => accumulator + currentValue, 0);
    return sum;
  };

  workflowRuntime.registerWorkflow(workflow);
  workflowRuntime.registerActivity(getWorkItemsActivity);
  workflowRuntime.registerActivity(processWorkItemActivity);

  // Wrap the worker startup in a try-catch block to handle any errors during startup
  try {
    await workflowRuntime.start();
    console.log("Worker started successfully");
  } catch (error) {
    console.error("Error starting worker:", error);
  }

  // Schedule a new orchestration
  try {
    const id = await workflowClient.scheduleNewWorkflow(workflow);
    console.log(`Orchestration scheduled with ID: ${id}`);

    // Wait for orchestration completion
    const state = await workflowClient.waitForWorkflowCompletion(id, undefined, 30);

    console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
  } catch (error) {
    console.error("Error scheduling or waiting for orchestration:", error);
  }

  // stop worker and client
  await workflowRuntime.stop();
  await workflowClient.stop();

  // stop the dapr side car
  process.exit(0);
}

start().catch((e) => {
  console.error(e);
  process.exit(1);
});
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("GetWorkBatch", null);

// Schedule the parallel tasks, but don't wait for them to complete yet.
var parallelTasks = new List<Task<int>>(workBatch.Length);
for (int i = 0; i < workBatch.Length; i++)
{
    Task<int> task = context.CallActivityAsync<int>("ProcessWorkItem", workBatch[i]);
    parallelTasks.Add(task);
}

// Everything is scheduled. Wait here until all parallel tasks have completed.
await Task.WhenAll(parallelTasks);

// Aggregate all N outputs and publish the result.
int sum = parallelTasks.Sum(t => t.Result);
await context.CallActivityAsync("PostResults", sum);
public class FaninoutWorkflow extends Workflow {
    @Override
    public WorkflowStub create() {
        return ctx -> {
            // Get a list of N work items to process in parallel.
            Object[] workBatch = ctx.callActivity("GetWorkBatch", Object[].class).await();
            // Schedule the parallel tasks, but don't wait for them to complete yet.
            List<Task<Integer>> tasks = Arrays.stream(workBatch)
                    .map(workItem -> ctx.callActivity("ProcessWorkItem", workItem, int.class))
                    .collect(Collectors.toList());
            // Everything is scheduled. Wait here until all parallel tasks have completed.
            List<Integer> results = ctx.allOf(tasks).await();
            // Aggregate all N outputs and publish the result.
            int sum = results.stream().mapToInt(Integer::intValue).sum();
            ctx.complete(sum);
        };
    }
}
func BatchProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return 0, err
	}
	var workBatch []int
	if err := ctx.CallActivity(GetWorkBatch, workflow.ActivityInput(input)).Await(&workBatch); err != nil {
		return 0, err
	}
	parallelTasks := workflow.NewTaskSlice(len(workBatch))
	for i, workItem := range workBatch {
		parallelTasks[i] = ctx.CallActivity(ProcessWorkItem, workflow.ActivityInput(workItem))
	}
	var outputs int
	for _, task := range parallelTasks {
		var output int
		err := task.Await(&output)
		if err == nil {
			outputs += output
		} else {
			return 0, err
		}
	}
	if err := ctx.CallActivity(ProcessResults, workflow.ActivityInput(outputs)).Await(nil); err != nil {
		return 0, err
	}
	return 0, nil
}
func GetWorkBatch(ctx workflow.ActivityContext) (any, error) {
	var batchSize int
	if err := ctx.GetInput(&batchSize); err != nil {
		return 0, err
	}
	batch := make([]int, batchSize)
	for i := 0; i < batchSize; i++ {
		batch[i] = i
	}
	return batch, nil
}
func ProcessWorkItem(ctx workflow.ActivityContext) (any, error) {
	var workItem int
	if err := ctx.GetInput(&workItem); err != nil {
		return 0, err
	}
	fmt.Printf("Processing work item: %d\n", workItem)
	time.Sleep(time.Second * 5)
	result := workItem * 2
	fmt.Printf("Work item %d processed. Result: %d\n", workItem, result)
	return result, nil
}
func ProcessResults(ctx workflow.ActivityContext) (any, error) {
	var finalResult int
	if err := ctx.GetInput(&finalResult); err != nil {
		return 0, err
	}
	fmt.Printf("Final result: %d\n", finalResult)
	return finalResult, nil
}

The key takeaways from this example are:

  • The fan-out/fan-in pattern can be expressed as a simple function using ordinary programming constructs
  • The number of parallel tasks can be static or dynamic
  • The workflow itself is capable of aggregating the results of parallel executions

Furthermore, the execution of the workflow is durable. If a workflow starts 100 parallel task executions and only 40 complete before the process crashes, the workflow restarts itself automatically and only schedules the remaining 60 tasks.

It’s possible to go further and limit the degree of concurrency using simple, language-specific constructs. The sample code below illustrates how to restrict the degree of fan-out to just 5 concurrent activity executions:


//Revisiting the earlier example...
// Get a list of N work items to process in parallel.
object[] workBatch = await context.CallActivityAsync<object[]>("GetWorkBatch", null);

const int MaxParallelism = 5;
var results = new List<int>();
var inFlightTasks = new HashSet<Task<int>>();
foreach(var workItem in workBatch)
{
  if (inFlightTasks.Count >= MaxParallelism)
  {
    var finishedTask = await Task.WhenAny(inFlightTasks);
    results.Add(finishedTask.Result);
    inFlightTasks.Remove(finishedTask);
  }

  inFlightTasks.Add(context.CallActivityAsync<int>("ProcessWorkItem", workItem));
}
results.AddRange(await Task.WhenAll(inFlightTasks));

var sum = results.Sum(t => t);
await context.CallActivityAsync("PostResults", sum);

With the release of 1.16, it’s even easier to process workflow activities in parallel while putting an upper cap on concurrency by using the following extension methods on the WorkflowContext:

//Revisiting the earlier example...
// Get a list of work items to process
var workBatch = await context.CallActivityAsync<object[]>("GetWorkBatch", null);

// Process deterministically in parallel with an upper cap of 5 activities at a time
var results = await context.ProcessInParallelAsync(workBatch, workItem => context.CallActivityAsync<int>("ProcessWorkItem", workItem), maxConcurrency: 5);

var sum = results.Sum(t => t);
await context.CallActivityAsync("PostResults", sum);

Limiting the degree of concurrency in this way can be useful for limiting contention against shared resources. For example, if the activities need to call into external resources that have their own concurrency limits, like a databases or external APIs, it can be useful to ensure that no more than a specified number of activities call that resource concurrently.

Async HTTP APIs

Asynchronous HTTP APIs are typically implemented using the Asynchronous Request-Reply pattern. Implementing this pattern traditionally involves the following:

  1. A client sends a request to an HTTP API endpoint (the start API)
  2. The start API writes a message to a backend queue, which triggers the start of a long-running operation
  3. Immediately after scheduling the backend operation, the start API returns an HTTP 202 response to the client with an identifier that can be used to poll for status
  4. The status API queries a database that contains the status of the long-running operation
  5. The client repeatedly polls the status API either until some timeout expires or it receives a “completion” response

The end-to-end flow is illustrated in the following diagram.

Diagram showing how the async request response pattern works

The challenge with implementing the asynchronous request-reply pattern is that it involves the use of multiple APIs and state stores. It also involves implementing the protocol correctly so that the client knows how to automatically poll for status and know when the operation is complete.

The Dapr workflow HTTP API supports the asynchronous request-reply pattern out-of-the box, without requiring you to write any code or do any state management.

The following curl commands illustrate how the workflow APIs support this pattern.

curl -X POST http://localhost:3500/v1.0/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678 -d '{"Name":"Paperclips","Quantity":1,"TotalCost":9.95}'

The previous command will result in the following response JSON:

{"instanceID":"12345678"}

The HTTP client can then construct the status query URL using the workflow instance ID and poll it repeatedly until it sees the “COMPLETE”, “FAILURE”, or “TERMINATED” status in the payload.

curl http://localhost:3500/v1.0/workflows/dapr/12345678

The following is an example of what an in-progress workflow status might look like.

{
  "instanceID": "12345678",
  "workflowName": "OrderProcessingWorkflow",
  "createdAt": "2023-05-03T23:22:11.143069826Z",
  "lastUpdatedAt": "2023-05-03T23:22:22.460025267Z",
  "runtimeStatus": "RUNNING",
  "properties": {
    "dapr.workflow.custom_status": "",
    "dapr.workflow.input": "{\"Name\":\"Paperclips\",\"Quantity\":1,\"TotalCost\":9.95}"
  }
}

As you can see from the previous example, the workflow’s runtime status is RUNNING, which lets the client know that it should continue polling.

If the workflow has completed, the status might look as follows.

{
  "instanceID": "12345678",
  "workflowName": "OrderProcessingWorkflow",
  "createdAt": "2023-05-03T23:30:11.381146313Z",
  "lastUpdatedAt": "2023-05-03T23:30:52.923870615Z",
  "runtimeStatus": "COMPLETED",
  "properties": {
    "dapr.workflow.custom_status": "",
    "dapr.workflow.input": "{\"Name\":\"Paperclips\",\"Quantity\":1,\"TotalCost\":9.95}",
    "dapr.workflow.output": "{\"Processed\":true}"
  }
}

As you can see from the previous example, the runtime status of the workflow is now COMPLETED, which means the client can stop polling for updates.

Monitor

The monitor pattern is recurring process that typically:

  1. Checks the status of a system
  2. Takes some action based on that status - e.g. send a notification
  3. Sleeps for some period of time
  4. Repeat

The following diagram provides a rough illustration of this pattern.

Diagram showing how the monitor pattern works

Depending on the business needs, there may be a single monitor or there may be multiple monitors, one for each business entity (for example, a stock). Furthermore, the amount of time to sleep may need to change, depending on the circumstances. These requirements make using cron-based scheduling systems impractical.

Dapr Workflow supports this pattern natively by allowing you to implement eternal workflows. Rather than writing infinite while-loops (which is an anti-pattern), Dapr Workflow exposes a continue-as-new API that workflow authors can use to restart a workflow function from the beginning with a new input.

from dataclasses import dataclass
from datetime import timedelta
import random
import dapr.ext.workflow as wf


@dataclass
class JobStatus:
    job_id: str
    is_healthy: bool


def status_monitor_workflow(ctx: wf.DaprWorkflowContext, job: JobStatus):
    # poll a status endpoint associated with this job
    status = yield ctx.call_activity(check_status, input=job)
    if not ctx.is_replaying:
        print(f"Job '{job.job_id}' is {status}.")

    if status == "healthy":
        job.is_healthy = True
        next_sleep_interval = 60  # check less frequently when healthy
    else:
        if job.is_healthy:
            job.is_healthy = False
            ctx.call_activity(send_alert, input=f"Job '{job.job_id}' is unhealthy!")
        next_sleep_interval = 5  # check more frequently when unhealthy

    yield ctx.create_timer(fire_at=ctx.current_utc_datetime + timedelta(minutes=next_sleep_interval))

    # restart from the beginning with a new JobStatus input
    ctx.continue_as_new(job)


def check_status(ctx, _) -> str:
    return random.choice(["healthy", "unhealthy"])


def send_alert(ctx, message: str):
    print(f'*** Alert: {message}')
const statusMonitorWorkflow: TWorkflow = async function* (ctx: WorkflowContext): any {
    let duration;
    const status = yield ctx.callActivity(checkStatusActivity);
    if (status === "healthy") {
      // Check less frequently when in a healthy state
      // set duration to 1 hour
      duration = 60 * 60;
    } else {
      yield ctx.callActivity(alertActivity, "job unhealthy");
      // Check more frequently when in an unhealthy state
      // set duration to 5 minutes
      duration = 5 * 60;
    }

    // Put the workflow to sleep until the determined time
    ctx.createTimer(duration);

    // Restart from the beginning with the updated state
    ctx.continueAsNew();
  };
public override async Task<object> RunAsync(WorkflowContext context, MyEntityState myEntityState)
{
    TimeSpan nextSleepInterval;

    var status = await context.CallActivityAsync<string>("GetStatus");
    if (status == "healthy")
    {
        myEntityState.IsHealthy = true;

        // Check less frequently when in a healthy state
        nextSleepInterval = TimeSpan.FromMinutes(60);
    }
    else
    {
        if (myEntityState.IsHealthy)
        {
            myEntityState.IsHealthy = false;
            await context.CallActivityAsync("SendAlert", myEntityState);
        }

        // Check more frequently when in an unhealthy state
        nextSleepInterval = TimeSpan.FromMinutes(5);
    }

    // Put the workflow to sleep until the determined time
    await context.CreateTimer(nextSleepInterval);

    // Restart from the beginning with the updated state
    context.ContinueAsNew(myEntityState);
    return null;
}

This example assumes you have a predefined MyEntityState class with a boolean IsHealthy property.

public class MonitorWorkflow extends Workflow {

  @Override
  public WorkflowStub create() {
    return ctx -> {

      Duration nextSleepInterval;

      var status = ctx.callActivity(DemoWorkflowStatusActivity.class.getName(), DemoStatusActivityOutput.class).await();
      var isHealthy = status.getIsHealthy();

      if (isHealthy) {
        // Check less frequently when in a healthy state
        nextSleepInterval = Duration.ofMinutes(60);
      } else {

        ctx.callActivity(DemoWorkflowAlertActivity.class.getName()).await();

        // Check more frequently when in an unhealthy state
        nextSleepInterval = Duration.ofMinutes(5);
      }

      // Put the workflow to sleep until the determined time
      try {
        ctx.createTimer(nextSleepInterval);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }

      // Restart from the beginning with the updated state
      ctx.continueAsNew();
    }
  }
}
type JobStatus struct {
	JobID     string `json:"job_id"`
	IsHealthy bool   `json:"is_healthy"`
}
func StatusMonitorWorkflow(ctx *workflow.WorkflowContext) (any, error) {
	var sleepInterval time.Duration
	var job JobStatus
	if err := ctx.GetInput(&job); err != nil {
		return "", err
	}
	var status string
	if err := ctx.CallActivity(CheckStatus, workflow.ActivityInput(job)).Await(&status); err != nil {
		return "", err
	}
	if status == "healthy" {
		job.IsHealthy = true
		sleepInterval = time.Minutes * 60
	} else {
		if job.IsHealthy {
			job.IsHealthy = false
			err := ctx.CallActivity(SendAlert, workflow.ActivityInput(fmt.Sprintf("Job '%s' is unhealthy!", job.JobID))).Await(nil)
			if err != nil {
				return "", err
			}
		}
		sleepInterval = time.Minutes * 5
	}
	if err := ctx.CreateTimer(sleepInterval).Await(nil); err != nil {
		return "", err
	}
	ctx.ContinueAsNew(job, false)
	return "", nil
}
func CheckStatus(ctx workflow.ActivityContext) (any, error) {
	statuses := []string{"healthy", "unhealthy"}
	return statuses[rand.Intn(1)], nil
}
func SendAlert(ctx workflow.ActivityContext) (any, error) {
	var message string
	if err := ctx.GetInput(&message); err != nil {
		return "", err
	}
	fmt.Printf("*** Alert: %s", message)
	return "", nil
}

A workflow implementing the monitor pattern can loop forever or it can terminate itself gracefully by not calling continue-as-new.

External system interaction

In some cases, a workflow may need to pause and wait for an external system to perform some action. For example, a workflow may need to pause and wait for a payment to be received. In this case, a payment system might publish an event to a pub/sub topic on receipt of a payment, and a listener on that topic can raise an event to the workflow using the raise event workflow API.

Another very common scenario is when a workflow needs to pause and wait for a human, for example when approving a purchase order. Dapr Workflow supports this event pattern via the external events feature.

Here’s an example workflow for a purchase order involving a human:

  1. A workflow is triggered when a purchase order is received.
  2. A rule in the workflow determines that a human needs to perform some action. For example, the purchase order cost exceeds a certain auto-approval threshold.
  3. The workflow sends a notification requesting a human action. For example, it sends an email with an approval link to a designated approver.
  4. The workflow pauses and waits for the human to either approve or reject the order by clicking on a link.
  5. If the approval isn’t received within the specified time, the workflow resumes and performs some compensation logic, such as canceling the order.

The following diagram illustrates this flow.

Diagram showing how the external system interaction pattern works with a human involved

The following example code shows how this pattern can be implemented using Dapr Workflow.

from dataclasses import dataclass
from datetime import timedelta
import dapr.ext.workflow as wf


@dataclass
class Order:
    cost: float
    product: str
    quantity: int

    def __str__(self):
        return f'{self.product} ({self.quantity})'


@dataclass
class Approval:
    approver: str

    @staticmethod
    def from_dict(dict):
        return Approval(**dict)


def purchase_order_workflow(ctx: wf.DaprWorkflowContext, order: Order):
    # Orders under $1000 are auto-approved
    if order.cost < 1000:
        return "Auto-approved"

    # Orders of $1000 or more require manager approval
    yield ctx.call_activity(send_approval_request, input=order)

    # Approvals must be received within 24 hours or they will be canceled.
    approval_event = ctx.wait_for_external_event("approval_received")
    timeout_event = ctx.create_timer(timedelta(hours=24))
    winner = yield wf.when_any([approval_event, timeout_event])
    if winner == timeout_event:
        return "Cancelled"

    # The order was approved
    yield ctx.call_activity(place_order, input=order)
    approval_details = Approval.from_dict(approval_event.get_result())
    return f"Approved by '{approval_details.approver}'"


def send_approval_request(_, order: Order) -> None:
    print(f'*** Sending approval request for order: {order}')


def place_order(_, order: Order) -> None:
    print(f'*** Placing order: {order}')
import {
  Task,
  DaprWorkflowClient,
  WorkflowActivityContext,
  WorkflowContext,
  WorkflowRuntime,
  TWorkflow,
} from "@dapr/dapr";
import * as readlineSync from "readline-sync";

// Wrap the entire code in an immediately-invoked async function
async function start() {
  class Order {
    cost: number;
    product: string;
    quantity: number;
    constructor(cost: number, product: string, quantity: number) {
      this.cost = cost;
      this.product = product;
      this.quantity = quantity;
    }
  }

  function sleep(ms: number): Promise<void> {
    return new Promise((resolve) => setTimeout(resolve, ms));
  }

  // Update the gRPC client and worker to use a local address and port
  const daprHost = "localhost";
  const daprPort = "50001";
  const workflowClient = new DaprWorkflowClient({
    daprHost,
    daprPort,
  });
  const workflowRuntime = new WorkflowRuntime({
    daprHost,
    daprPort,
  });

  // Activity function that sends an approval request to the manager
  const sendApprovalRequest = async (_: WorkflowActivityContext, order: Order) => {
    // Simulate some work that takes an amount of time
    await sleep(3000);
    console.log(`Sending approval request for order: ${order.product}`);
  };

  // Activity function that places an order
  const placeOrder = async (_: WorkflowActivityContext, order: Order) => {
    console.log(`Placing order: ${order.product}`);
  };

  // Orchestrator function that represents a purchase order workflow
  const purchaseOrderWorkflow: TWorkflow = async function* (ctx: WorkflowContext, order: Order): any {
    // Orders under $1000 are auto-approved
    if (order.cost < 1000) {
      return "Auto-approved";
    }

    // Orders of $1000 or more require manager approval
    yield ctx.callActivity(sendApprovalRequest, order);

    // Approvals must be received within 24 hours or they will be cancled.
    const tasks: Task<any>[] = [];
    const approvalEvent = ctx.waitForExternalEvent("approval_received");
    const timeoutEvent = ctx.createTimer(24 * 60 * 60);
    tasks.push(approvalEvent);
    tasks.push(timeoutEvent);
    const winner = ctx.whenAny(tasks);

    if (winner == timeoutEvent) {
      return "Cancelled";
    }

    yield ctx.callActivity(placeOrder, order);
    const approvalDetails = approvalEvent.getResult();
    return `Approved by ${approvalDetails.approver}`;
  };

  workflowRuntime
    .registerWorkflow(purchaseOrderWorkflow)
    .registerActivity(sendApprovalRequest)
    .registerActivity(placeOrder);

  // Wrap the worker startup in a try-catch block to handle any errors during startup
  try {
    await workflowRuntime.start();
    console.log("Worker started successfully");
  } catch (error) {
    console.error("Error starting worker:", error);
  }

  // Schedule a new orchestration
  try {
    const cost = readlineSync.questionInt("Cost of your order:");
    const approver = readlineSync.question("Approver of your order:");
    const timeout = readlineSync.questionInt("Timeout for your order in seconds:");
    const order = new Order(cost, "MyProduct", 1);
    const id = await workflowClient.scheduleNewWorkflow(purchaseOrderWorkflow, order);
    console.log(`Orchestration scheduled with ID: ${id}`);

    // prompt for approval asynchronously
    promptForApproval(approver, workflowClient, id);

    // Wait for orchestration completion
    const state = await workflowClient.waitForWorkflowCompletion(id, undefined, timeout + 2);

    console.log(`Orchestration completed! Result: ${state?.serializedOutput}`);
  } catch (error) {
    console.error("Error scheduling or waiting for orchestration:", error);
  }

  // stop worker and client
  await workflowRuntime.stop();
  await workflowClient.stop();

  // stop the dapr side car
  process.exit(0);
}

async function promptForApproval(approver: string, workflowClient: DaprWorkflowClient, id: string) {
  if (readlineSync.keyInYN("Press [Y] to approve the order... Y/yes, N/no")) {
    const approvalEvent = { approver: approver };
    await workflowClient.raiseEvent(id, "approval_received", approvalEvent);
  } else {
    return "Order rejected";
  }
}

start().catch((e) => {
  console.error(e);
  process.exit(1);
});
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
    // ...(other steps)...

    // Require orders over a certain threshold to be approved
    if (order.TotalCost > OrderApprovalThreshold)
    {
        try
        {
            // Request human approval for this order
            await context.CallActivityAsync(nameof(RequestApprovalActivity), order);

            // Pause and wait for a human to approve the order
            ApprovalResult approvalResult = await context.WaitForExternalEventAsync<ApprovalResult>(
                eventName: "ManagerApproval",
                timeout: TimeSpan.FromDays(3));
            if (approvalResult == ApprovalResult.Rejected)
            {
                // The order was rejected, end the workflow here
                return new OrderResult(Processed: false);
            }
        }
        catch (TaskCanceledException)
        {
            // An approval timeout results in automatic order cancellation
            return new OrderResult(Processed: false);
        }
    }

    // ...(other steps)...

    // End the workflow with a success result
    return new OrderResult(Processed: true);
}

Note In the example above, RequestApprovalActivity is the name of a workflow activity to invoke and ApprovalResult is an enumeration defined by the workflow app. For brevity, these definitions were left out of the example code.

public class ExternalSystemInteractionWorkflow extends Workflow {
    @Override
    public WorkflowStub create() {
        return ctx -> {
            // ...other steps...
            Integer orderCost = ctx.getInput(int.class);
            // Require orders over a certain threshold to be approved
            if (orderCost > ORDER_APPROVAL_THRESHOLD) {
                try {
                    // Request human approval for this order
                    ctx.callActivity("RequestApprovalActivity", orderCost, Void.class).await();
                    // Pause and wait for a human to approve the order
                    boolean approved = ctx.waitForExternalEvent("ManagerApproval", Duration.ofDays(3), boolean.class).await();
                    if (!approved) {
                        // The order was rejected, end the workflow here
                        ctx.complete("Process reject");
                    }
                } catch (TaskCanceledException e) {
                    // An approval timeout results in automatic order cancellation
                    ctx.complete("Process cancel");
                }
            }
            // ...other steps...

            // End the workflow with a success result
            ctx.complete("Process approved");
        };
    }
}
type Order struct {
	Cost     float64 `json:"cost"`
	Product  string  `json:"product"`
	Quantity int     `json:"quantity"`
}
type Approval struct {
	Approver string `json:"approver"`
}
func PurchaseOrderWorkflow(ctx *workflow.WorkflowContext) (any, error) {
	var order Order
	if err := ctx.GetInput(&order); err != nil {
		return "", err
	}
	// Orders under $1000 are auto-approved
	if order.Cost < 1000 {
		return "Auto-approved", nil
	}
	// Orders of $1000 or more require manager approval
	if err := ctx.CallActivity(SendApprovalRequest, workflow.ActivityInput(order)).Await(nil); err != nil {
		return "", err
	}
	// Approvals must be received within 24 hours or they will be cancelled
	var approval Approval
	if err := ctx.WaitForExternalEvent("approval_received", time.Hour*24).Await(&approval); err != nil {
		// Assuming that a timeout has taken place - in any case; an error.
		return "error/cancelled", err
	}
	// The order was approved
	if err := ctx.CallActivity(PlaceOrder, workflow.ActivityInput(order)).Await(nil); err != nil {
		return "", err
	}
	return fmt.Sprintf("Approved by %s", approval.Approver), nil
}
func SendApprovalRequest(ctx workflow.ActivityContext) (any, error) {
	var order Order
	if err := ctx.GetInput(&order); err != nil {
		return "", err
	}
	fmt.Printf("*** Sending approval request for order: %v\n", order)
	return "", nil
}
func PlaceOrder(ctx workflow.ActivityContext) (any, error) {
	var order Order
	if err := ctx.GetInput(&order); err != nil {
		return "", err
	}
	fmt.Printf("*** Placing order: %v", order)
	return "", nil
}

The code that delivers the event to resume the workflow execution is external to the workflow. Workflow events can be delivered to a waiting workflow instance using the raise event workflow management API, as shown in the following example:

from dapr.clients import DaprClient
from dataclasses import asdict

with DaprClient() as d:
    d.raise_workflow_event(
        instance_id=instance_id,
        workflow_component="dapr",
        event_name="approval_received",
        event_data=asdict(Approval("Jane Doe")))
import { DaprClient } from "@dapr/dapr";

  public async raiseEvent(workflowInstanceId: string, eventName: string, eventPayload?: any) {
    this._innerClient.raiseOrchestrationEvent(workflowInstanceId, eventName, eventPayload);
  }
// Raise the workflow event to the waiting workflow
await daprClient.RaiseWorkflowEventAsync(
    instanceId: orderId,
    workflowComponent: "dapr",
    eventName: "ManagerApproval",
    eventData: ApprovalResult.Approved);
System.out.println("**SendExternalMessage: RestartEvent**");
client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload");
func raiseEvent() {
  daprClient, err := client.NewClient()
  if err != nil {
    log.Fatalf("failed to initialize the client")
  }
  err = daprClient.RaiseEventWorkflow(context.Background(), &client.RaiseEventWorkflowRequest{
    InstanceID: "instance_id",
    WorkflowComponent: "dapr",
    EventName: "approval_received",
    EventData: Approval{
      Approver: "Jane Doe",
    },
  })
  if err != nil {
    log.Fatalf("failed to raise event on workflow")
  }
  log.Println("raised an event on specified workflow")
}

External events don’t have to be directly triggered by humans. They can also be triggered by other systems. For example, a workflow may need to pause and wait for a payment to be received. In this case, a payment system might publish an event to a pub/sub topic on receipt of a payment, and a listener on that topic can raise an event to the workflow using the raise event workflow API.

Compensation

The compensation pattern (also known as the saga pattern) provides a mechanism for rolling back or undoing operations that have already been executed when a workflow fails partway through. This pattern is particularly important for long-running workflows that span multiple microservices where traditional database transactions are not feasible.

In distributed microservice architectures, you often need to coordinate operations across multiple services. When these operations cannot be wrapped in a single transaction, the compensation pattern provides a way to maintain consistency by defining compensating actions for each step in the workflow.

The compensation pattern addresses several critical challenges:

  • Distributed Transaction Management: When a workflow spans multiple microservices, each with their own data stores, traditional ACID transactions are not possible. The compensation pattern provides transactional consistency by ensuring operations are either all completed successfully or all undone through compensation.
  • Partial Failure Recovery: If a workflow fails after some steps have completed successfully, the compensation pattern allows you to undo those completed steps gracefully.
  • Business Process Integrity: Ensures that business processes can be properly rolled back in case of failures, maintaining the integrity of your business operations.
  • Long-Running Processes: For workflows that may run for hours, days, or longer, traditional locking mechanisms are impractical. Compensation provides a way to handle failures in these scenarios.

Common use cases for the compensation pattern include:

  • E-commerce Order Processing: Reserve inventory, charge payment, and ship orders. If shipping fails, you need to release the inventory and refund the payment.
  • Financial Transactions: In a money transfer, if crediting the destination account fails, you need to rollback the debit from the source account.
  • Resource Provisioning: When provisioning cloud resources across multiple providers, if one step fails, you need to clean up all previously provisioned resources.
  • Multi-Step Business Processes: Any business process that involves multiple irreversible steps that may need to be undone in case of later failures.

Dapr Workflow provides support for the compensation pattern, allowing you to register compensation activities for each step and execute them in reverse order when needed.

Here’s an example workflow for an e-commerce process:

  1. A workflow is triggered when an order is received.
  2. A reservation is made for the order in the inventory.
  3. The payment is processed.
  4. The order is shipped.
  5. If any of the above actions results in an error, the actions are compensated with another action:
    • The shipment is cancelled.
    • The payment is refunded.
    • The inventory reservation is released.

The following diagram illustrates this flow.

Diagram showing how the compensation pattern.
public class PaymentProcessingWorkflow implements Workflow {
    
    @Override
    public WorkflowStub create() {
        return ctx -> {
            ctx.getLogger().info("Starting Workflow: " + ctx.getName());
            var orderId = ctx.getInput(String.class);
            List<String> compensations = new ArrayList<>();
            
            try {
                // Step 1: Reserve inventory
                String reservationId = ctx.callActivity(ReserveInventoryActivity.class.getName(), orderId, String.class).await();
                ctx.getLogger().info("Inventory reserved: {}", reservationId);
                compensations.add("ReleaseInventory");
                
                // Step 2: Process payment
                String paymentId = ctx.callActivity(ProcessPaymentActivity.class.getName(), orderId, String.class).await();
                ctx.getLogger().info("Payment processed: {}", paymentId);
                compensations.add("RefundPayment");
                
                // Step 3: Ship order
                String shipmentId = ctx.callActivity(ShipOrderActivity.class.getName(), orderId, String.class).await();
                ctx.getLogger().info("Order shipped: {}", shipmentId);
                compensations.add("CancelShipment");
                
            } catch (TaskFailedException e) {
                ctx.getLogger().error("Activity failed: {}", e.getMessage());
                
                // Execute compensations in reverse order
                Collections.reverse(compensations);
                for (String compensation : compensations) {
                    try {
                        switch (compensation) {
                            case "CancelShipment":
                                String shipmentCancelResult = ctx.callActivity(
                                    CancelShipmentActivity.class.getName(), 
                                    orderId, 
                                    String.class).await();
                                ctx.getLogger().info("Shipment cancellation completed: {}", shipmentCancelResult);
                                break;
                                
                            case "RefundPayment":
                                String refundResult = ctx.callActivity(
                                    RefundPaymentActivity.class.getName(), 
                                    orderId, 
                                    String.class).await();
                                ctx.getLogger().info("Payment refund completed: {}", refundResult);
                                break;
                                
                            case "ReleaseInventory":
                                String releaseResult = ctx.callActivity(
                                    ReleaseInventoryActivity.class.getName(), 
                                    orderId, 
                                    String.class).await();
                                ctx.getLogger().info("Inventory release completed: {}", releaseResult);
                                break;
                        }
                    } catch (TaskFailedException ex) {
                        ctx.getLogger().error("Compensation activity failed: {}", ex.getMessage());
                    }
                }
                ctx.complete("Order processing failed, compensation applied");
            }

			// Step 4: Send confirmation
			ctx.callActivity(SendConfirmationActivity.class.getName(), orderId, Void.class).await();
            ctx.getLogger().info("Confirmation sent for order: {}", orderId);
                
            ctx.complete("Order processed successfully: " + orderId);
        };
    }
}

// Example activities
class ReserveInventoryActivity implements WorkflowActivity {
    @Override
    public Object run(WorkflowActivityContext ctx) {
        String orderId = ctx.getInput(String.class);
        // Logic to reserve inventory
        String reservationId = "reservation_" + orderId;
        System.out.println("Reserved inventory for order: " + orderId);
        return reservationId;
    }
}

class ReleaseInventoryActivity implements WorkflowActivity {
    @Override
    public Object run(WorkflowActivityContext ctx) {
        String reservationId = ctx.getInput(String.class);
        // Logic to release inventory reservation
        System.out.println("Released inventory reservation: " + reservationId);
        return "Released: " + reservationId;
    }
}

class ProcessPaymentActivity implements WorkflowActivity {
    @Override
    public Object run(WorkflowActivityContext ctx) {
        String orderId = ctx.getInput(String.class);
        // Logic to process payment
        String paymentId = "payment_" + orderId;
        System.out.println("Processed payment for order: " + orderId);
        return paymentId;
    }
}

class RefundPaymentActivity implements WorkflowActivity {
    @Override
    public Object run(WorkflowActivityContext ctx) {
        String paymentId = ctx.getInput(String.class);
        // Logic to refund payment
        System.out.println("Refunded payment: " + paymentId);
        return "Refunded: " + paymentId;
    }
}

class ShipOrderActivity implements WorkflowActivity {
    @Override
    public Object run(WorkflowActivityContext ctx) {
        String orderId = ctx.getInput(String.class);
        // Logic to ship order
        String shipmentId = "shipment_" + orderId;
        System.out.println("Shipped order: " + orderId);
        return shipmentId;
    }
}

class CancelShipmentActivity implements WorkflowActivity {
    @Override
    public Object run(WorkflowActivityContext ctx) {
        String shipmentId = ctx.getInput(String.class);
        // Logic to cancel shipment
        System.out.println("Canceled shipment: " + shipmentId);
        return "Canceled: " + shipmentId;
    }
}

class SendConfirmationActivity implements WorkflowActivity {
    @Override
    public Object run(WorkflowActivityContext ctx) {
        String orderId = ctx.getInput(String.class);
        // Logic to send confirmation
        System.out.println("Sent confirmation for order: " + orderId);
        return null;
    }
}

The key benefits of using Dapr Workflow’s compensation pattern include:

  • Compensation Control: You have full control over when and how compensation activities are executed.
  • Flexible Configuration: You can implement custom logic for determining which compensations to run.
  • Error Handling: Handle compensation failures according to your specific business requirements.
  • Simple Implementation: No additional framework dependencies - just standard workflow activities and exception handling.

The compensation pattern ensures that your distributed workflows can maintain consistency and recover gracefully from failures, making it an essential tool for building reliable microservice architectures.

Next steps

Workflow architecture >>

4 - Workflow architecture

The Dapr Workflow engine architecture

Dapr Workflows allow developers to define workflows using ordinary code in a variety of programming languages. The workflow engine runs inside of the Dapr sidecar and orchestrates workflow code deployed as part of your application. Dapr Workflows are built on top of Dapr Actors providing durability and scalability for workflow execution.

This article describes:

  • The architecture of the Dapr Workflow engine
  • How the workflow engine interacts with application code
  • How the workflow engine fits into the overall Dapr architecture
  • How different workflow backends can work with workflow engine

For more information on how to author Dapr Workflows in your application, see How to: Author a workflow.

The Dapr Workflow engine is internally powered by Dapr’s actor runtime. The following diagram illustrates the Dapr Workflow architecture in Kubernetes mode:

Diagram showing how the workflow architecture works in Kubernetes mode

To use the Dapr Workflow building block, you write workflow code in your application using the Dapr Workflow SDK, which internally connects to the sidecar using a gRPC stream. This registers the workflow and any workflow activities, or tasks that workflows can schedule.

The engine is embedded directly into the sidecar and implemented using the durabletask-go framework library. This framework allows you to swap out different storage providers, including a storage provider created for Dapr that leverages internal actors behind the scenes. Since Dapr Workflows use actors, you can store workflow state in state stores.

Sidecar interactions

When a workflow application starts up, it uses a workflow authoring SDK to send a gRPC request to the Dapr sidecar and get back a stream of workflow work items, following the server streaming RPC pattern. These work items can be anything from “start a new X workflow” (where X is the type of a workflow) to “schedule activity Y with input Z to run on behalf of workflow X”.

The workflow app executes the appropriate workflow code and then sends a gRPC request back to the sidecar with the execution results.

Dapr Workflow Engine Protocol

All interactions happen over a single gRPC channel and are initiated by the application, which means the application doesn’t need to open any inbound ports. The details of these interactions are internally handled by the language-specific Dapr Workflow authoring SDK.

Differences between workflow and application actor interactions

If you’re familiar with Dapr actors, you may notice a few differences in terms of how sidecar interactions works for workflows compared to application defined actors.

ActorsWorkflows
Actors created by the application can interact with the sidecar using either HTTP or gRPC.Workflows only use gRPC. Due to the workflow gRPC protocol’s complexity, an SDK is required when implementing workflows.
Actor operations are pushed to application code from the sidecar. This requires the application to listen on a particular app port.For workflows, operations are pulled from the sidecar by the application using a streaming protocol. The application doesn’t need to listen on any ports to run workflows.
Actors explicitly register themselves with the sidecar.Workflows do not register themselves with the sidecar. The embedded engine doesn’t keep track of workflow types. This responsibility is instead delegated to the workflow application and its SDK.

Workflow distributed tracing

The durabletask-go core used by the workflow engine writes distributed traces using Open Telemetry SDKs. These traces are captured automatically by the Dapr sidecar and exported to the configured Open Telemetry provider, such as Zipkin.

Each workflow instance managed by the engine is represented as one or more spans. There is a single parent span representing the full workflow execution and child spans for the various tasks, including spans for activity task execution and durable timers.

Workflow activity code currently does not have access to the trace context.

Workflow actors

Upon the workflow client connecting to the sidecar, there are two types of actors that are registered in support of the workflow engine:

  • dapr.internal.{namespace}.{appID}.workflow
  • dapr.internal.{namespace}.{appID}.activity

The {namespace} value is the Dapr namespace and defaults to default if no namespace is configured. The {appID} value is the app’s ID. For example, if you have a workflow app named “wfapp”, then the type of the workflow actor would be dapr.internal.default.wfapp.workflow and the type of the activity actor would be dapr.internal.default.wfapp.activity.

The following diagram demonstrates how workflow actors operate in a Kubernetes scenario:

Diagram demonstrating internally registered actors across a cluster

Just like user-defined actors, workflow actors are distributed across the cluster by the hashing lookup table provided by the actor placement service. They also maintain their own state and make use of reminders. However, unlike actors that live in application code, these workflow actors are embedded into the Dapr sidecar. Application code is completely unaware that these actors exist.

Workflow actors

There are 2 different types of actors used with workflows: workflow actors and activity actors. Workflow actors are responsible for managing the state and placement of all workflows running in the app. A new instance of the workflow actor is activated for every workflow instance that gets scheduled. The ID of the workflow actor is the ID of the workflow. This workflow actor stores the state of the workflow as it progresses, and determines the node on which the workflow code executes via the actor lookup table.

As workflows are based on actors, all workflow and activity work is randomly distributed across all replicas of the application implementing workflows. There is no locality or relationship between where a workflow is started and where each work item is executed.

Each workflow actor saves its state using the following keys in the configured actor state store:

KeyDescription
inbox-NNNNNNA workflow’s inbox is effectively a FIFO queue of messages that drive a workflow’s execution. Example messages include workflow creation messages, activity task completion messages, etc. Each message is stored in its own key in the state store with the name inbox-NNNNNN where NNNNNN is a 6-digit number indicating the ordering of the messages. These state keys are removed once the corresponding messages are consumed by the workflow.
history-NNNNNNA workflow’s history is an ordered list of events that represent a workflow’s execution history. Each key in the history holds the data for a single history event. Like an append-only log, workflow history events are only added and never removed (except when a workflow performs a “continue as new” operation, which purges all history and restarts a workflow with a new input).
customStatusContains a user-defined workflow status value. There is exactly one customStatus key for each workflow actor instance.
metadataContains meta information about the workflow as a JSON blob and includes details such as the length of the inbox, the length of the history, and a 64-bit integer representing the workflow generation (for cases where the instance ID gets reused). The length information is used to determine which keys need to be read or written to when loading or saving workflow state updates.

The following diagram illustrates the typical lifecycle of a workflow actor.

Dapr Workflow Actor Flowchart

To summarize:

  1. A workflow actor is activated when it receives a new message.
  2. New messages then trigger the associated workflow code (in your application) to run and return an execution result back to the workflow actor.
  3. Once the result is received, the actor schedules any tasks as necessary.
  4. After scheduling, the actor updates its state in the state store.
  5. Finally, the actor goes idle until it receives another message. During this idle time, the sidecar may decide to unload the workflow actor from memory.

Activity actors

Activity actors are responsible for managing the state and placement of all workflow activity invocations. A new instance of the activity actor is activated for every activity task that gets scheduled by a workflow. The ID of the activity actor is the ID of the workflow combined with a sequence number (sequence numbers start with 0), as well as the “generation” (incremented during instances of rerunning from using continue as new). For example, if a workflow has an ID of 876bf371 and is the third activity to be scheduled by the workflow, it’s ID will be 876bf371::2::1 where 2 is the sequence number, and 1 is the generation. If the activity is scheduled again after a continue as new, the ID will be 876bf371::2::2.

No state is stored by activity actors, and instead all resulting data is sent back to the parent workflow actor.

The following diagram illustrates the typical lifecycle of an activity actor.

Workflow Activity Actor Flowchart

Activity actors are short-lived:

  1. Activity actors are activated when a workflow actor schedules an activity task.
  2. Activity actors then immediately call into the workflow application to invoke the associated activity code.
  3. Once the activity code has finished running and has returned its result, the activity actor sends a message to the parent workflow actor with the execution results.
  4. The activity actor then deactivates itself.
  5. Once the results are sent, the workflow is triggered to move forward to its next step.

Reminder usage and execution guarantees

The Dapr Workflow ensures workflow fault-tolerance by using actor reminders to recover from transient system failures. Prior to invoking application workflow code, the workflow or activity actor will create a new reminder. These reminders are made “one shot”, meaning that they will expire after successful triggering. If the application code executes without interruption, the reminder is triggered and expired. However, if the node or the sidecar hosting the associated workflow or activity crashes, the reminder will reactivate the corresponding actor and the execution will be retried, forever.

Diagram showing the process of invoking workflow actors

State store usage

Dapr Workflows use actors internally to drive the execution of workflows. Like any actors, these workflow actors store their state in the configured actor state store. Any state store that supports actors implicitly supports Dapr Workflow.

As discussed in the workflow actors section, workflows save their state incrementally by appending to a history log. The history log for a workflow is distributed across multiple state store keys so that each “checkpoint” only needs to append the newest entries.

The size of each checkpoint is determined by the number of concurrent actions scheduled by the workflow before it goes into an idle state. Sequential workflows will therefore make smaller batch updates to the state store, while fan-out/fan-in workflows will require larger batches. The size of the batch is also impacted by the size of inputs and outputs when workflows invoke activities or child workflows.

Diagram of workflow actor state store interactions

Different state store implementations may implicitly put restrictions on the types of workflows you can author. For example, the Azure Cosmos DB state store limits item sizes to 2 MB of UTF-8 encoded JSON (source). The input or output payload of an activity or child workflow is stored as a single record in the state store, so a item limit of 2 MB means that workflow and activity inputs and outputs can’t exceed 2 MB of JSON-serialized data.

Similarly, if a state store imposes restrictions on the size of a batch transaction, that may limit the number of parallel actions that can be scheduled by a workflow.

Workflow state can be purged from a state store, including all its history. Each Dapr SDK exposes APIs for purging all metadata related to specific workflow instances.

Workflow scalability

Because Dapr Workflows are internally implemented using actors, Dapr Workflows have the same scalability characteristics as actors. The placement service:

  • Doesn’t distinguish between workflow actors and actors you define in your application
  • Will load balance workflows using the same algorithms that it uses for actors

The expected scalability of a workflow is determined by the following factors:

  • The number of machines used to host your workflow application
  • The CPU and memory resources available on the machines running workflows
  • The scalability of the state store configured for actors
  • The scalability of the actor placement service and the reminder subsystem

The implementation details of the workflow code in the target application also plays a role in the scalability of individual workflow instances. Each workflow instance executes on a single node at a time, but a workflow can schedule activities and child workflows which run on other nodes.

Workflows can also schedule these activities and child workflows to run in parallel, allowing a single workflow to potentially distribute compute tasks across all available nodes in the cluster.

Diagram of workflow and activity actors scaled out across multiple Dapr instances

You can configure the maximum concurrent workflows and activities that can be executed at any one time with the following configuration. These limits are imposed on a per sidecar basis, meaning that if you have 10 replicas of your workflow app, the effective limit is 10 times the configured value. These limits do not distinguish between different workflow or activity definitions.

apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: appconfig
spec:
  workflow:
    maxConcurrentWorkflowInvocations: 100 # Default is infinite
    maxConcurrentActivityInvocations: 1000 # Default is infinite

Workflows don’t control the specifics of how load is distributed across the cluster. For example, if a workflow schedules 10 activity tasks to run in parallel, all 10 tasks may run on as many as 10 different compute nodes or as few as a single compute node. The actual scale behavior is determined by the actor placement service, which manages the distribution of the actors that represent each of the workflow’s tasks.

Workflow latency

In order to provide guarantees around durability and resiliency, Dapr Workflows frequently write to the state store and rely on reminders to drive execution. Dapr Workflows therefore may not be appropriate for latency-sensitive workloads. Expected sources of high latency include:

  • Latency from the state store when persisting workflow state.
  • Latency from the state store when rehydrating workflows with large histories.
  • Latency caused by too many active reminders in the cluster.
  • Latency caused by high CPU usage in the cluster.

See the Reminder usage and execution guarantees section for more details on how the design of workflow actors may impact execution latency.

Increasing scheduling throughput

By default, when a client schedules a workflow, the workflow engine waits for the workflow to be fully started before returning a response to the client. Waiting for the workflow to start before returning can decrease the scheduling throughput of workflows. When scheduling a workflow with a start time, the workflow engine does not wait for the workflow to start before returning a response to the client. To increase scheduling throughput, consider adding a start time of “now” when scheduling a workflow. An example of scheduling a workflow with a start time of “now” in the Go SDK is shown below:

client.ScheduleNewWorkflow(ctx, "MyCoolWorkflow", workflow.WithStartTime(time.Now()))

Next steps

Author workflows >>

5 - How to: Author a workflow

Learn how to develop and author workflows

This article provides a high-level overview of how to author workflows that are executed by the Dapr Workflow engine.

Author workflows as code

Dapr Workflow logic is implemented using general purpose programming languages, allowing you to:

  • Use your preferred programming language (no need to learn a new DSL or YAML schema).
  • Have access to the language’s standard libraries.
  • Build your own libraries and abstractions.
  • Use debuggers and examine local variables.
  • Write unit tests for your workflows, just like any other part of your application logic.

The Dapr sidecar doesn’t load any workflow definitions. Rather, the sidecar simply drives the execution of the workflows, leaving all the workflow activities to be part of the application.

Write the workflow activities

Workflow activities are the basic unit of work in a workflow and are the tasks that get orchestrated in the business process.

Define the workflow activities you’d like your workflow to perform. Activities are a function definition and can take inputs and outputs. The following example creates a counter (activity) called hello_act that notifies users of the current counter value. hello_act is a function derived from a class called WorkflowActivityContext.

@wfr.activity(name='hello_act')
def hello_act(ctx: WorkflowActivityContext, wf_input):
    global counter
    counter += wf_input
    print(f'New counter value is: {counter}!', flush=True)

See the task chaining workflow activity in context.

Define the workflow activities you’d like your workflow to perform. Activities are wrapped in the WorkflowActivityContext class, which implements the workflow activities.

export default class WorkflowActivityContext {
  private readonly _innerContext: ActivityContext;
  constructor(innerContext: ActivityContext) {
    if (!innerContext) {
      throw new Error("ActivityContext cannot be undefined");
    }
    this._innerContext = innerContext;
  }

  public getWorkflowInstanceId(): string {
    return this._innerContext.orchestrationId;
  }

  public getWorkflowActivityId(): number {
    return this._innerContext.taskId;
  }
}

See the workflow activity in context.

Define the workflow activities you’d like your workflow to perform. Activities are a class definition and can take inputs and outputs. Activities also participate in dependency injection, like binding to a Dapr client.

The activities called in the example below are:

  • NotifyActivity: Receive notification of a new order.
  • ReserveInventoryActivity: Check for sufficient inventory to meet the new order.
  • ProcessPaymentActivity: Process payment for the order. Includes NotifyActivity to send notification of successful order.

NotifyActivity

public class NotifyActivity : WorkflowActivity<Notification, object>
{
    //...

    public NotifyActivity(ILoggerFactory loggerFactory)
    {
        this.logger = loggerFactory.CreateLogger<NotifyActivity>();
    }

    //...
}

See the full NotifyActivity.cs workflow activity example.

ReserveInventoryActivity

public class ReserveInventoryActivity : WorkflowActivity<InventoryRequest, InventoryResult>
{
    //...

    public ReserveInventoryActivity(ILoggerFactory loggerFactory, DaprClient client)
    {
        this.logger = loggerFactory.CreateLogger<ReserveInventoryActivity>();
        this.client = client;
    }

    //...

}

See the full ReserveInventoryActivity.cs workflow activity example.

ProcessPaymentActivity

public class ProcessPaymentActivity : WorkflowActivity<PaymentRequest, object>
{
    //...
    public ProcessPaymentActivity(ILoggerFactory loggerFactory)
    {
        this.logger = loggerFactory.CreateLogger<ProcessPaymentActivity>();
    }

    //...

}

See the full ProcessPaymentActivity.cs workflow activity example.

Define the workflow activities you’d like your workflow to perform. Activities are wrapped in the public DemoWorkflowActivity class, which implements the workflow activities.

@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.ANY)
public class DemoWorkflowActivity implements WorkflowActivity {

  @Override
  public DemoActivityOutput run(WorkflowActivityContext ctx) {
    Logger logger = LoggerFactory.getLogger(DemoWorkflowActivity.class);
    logger.info("Starting Activity: " + ctx.getName());

    var message = ctx.getInput(DemoActivityInput.class).getMessage();
    var newMessage = message + " World!, from Activity";
    logger.info("Message Received from input: " + message);
    logger.info("Sending message to output: " + newMessage);

    logger.info("Sleeping for 5 seconds to simulate long running operation...");

    try {
      TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }


    logger.info("Activity finished");

    var output = new DemoActivityOutput(message, newMessage);
    logger.info("Activity returned: " + output);

    return output;
  }
}

See the Java SDK workflow activity example in context.

Define each workflow activity you’d like your workflow to perform. The Activity input can be unmarshalled from the context with ctx.GetInput. Activities should be defined as taking a ctx workflow.ActivityContext parameter and returning an interface and error.

func TestActivity(ctx workflow.ActivityContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return "", err
	}
	
	// Do something here
	return "result", nil
}

See the Go SDK workflow activity example in context.

Write the workflow

Next, register and call the activites in a workflow.

The hello_world_wf function is a function derived from a class called DaprWorkflowContext with input and output parameter types. It also includes a yield statement that does the heavy lifting of the workflow and calls the workflow activities.

@wfr.workflow(name='hello_world_wf')
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
    print(f'{wf_input}')
    yield ctx.call_activity(hello_act, input=1)
    yield ctx.call_activity(hello_act, input=10)
    yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
    yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)

    # Change in event handling: Use when_any to handle both event and timeout
    event = ctx.wait_for_external_event(event_name)
    timeout = ctx.create_timer(timedelta(seconds=30))
    winner = yield when_any([event, timeout])

    if winner == timeout:
        print('Workflow timed out waiting for event')
        return 'Timeout'

    yield ctx.call_activity(hello_act, input=100)
    yield ctx.call_activity(hello_act, input=1000)
    return 'Completed'

See the hello_world_wf workflow in context.

Next, register the workflow with the WorkflowRuntime class and start the workflow runtime.

export default class WorkflowRuntime {

  //..
  // Register workflow implementation for handling orchestrations
  public registerWorkflow(workflow: TWorkflow): WorkflowRuntime {
    const name = getFunctionName(workflow);
    const workflowWrapper = (ctx: OrchestrationContext, input: any): any => {
      const workflowContext = new WorkflowContext(ctx);
      return workflow(workflowContext, input);
    };
    this.worker.addNamedOrchestrator(name, workflowWrapper);
    return this;
  }

  // Register workflow activities
  public registerActivity(fn: TWorkflowActivity<TInput, TOutput>): WorkflowRuntime {
    const name = getFunctionName(fn);
    const activityWrapper = (ctx: ActivityContext, intput: TInput): TOutput => {
      const wfActivityContext = new WorkflowActivityContext(ctx);
      return fn(wfActivityContext, intput);
    };
    this.worker.addNamedActivity(name, activityWrapper);
    return this;
  }

  // Start the workflow runtime processing items and block.
  public async start() {
    await this.worker.start();
  }

}

See the WorkflowRuntime in context.

The OrderProcessingWorkflow class is derived from a base class called Workflow with input and output parameter types. It also includes a RunAsync method that does the heavy lifting of the workflow and calls the workflow activities.

 class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
    {
        public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
        {
            //...

            await context.CallActivityAsync(
                nameof(NotifyActivity),
                new Notification($"Received order {orderId} for {order.Name} at {order.TotalCost:c}"));

            //...

            InventoryResult result = await context.CallActivityAsync<InventoryResult>(
                nameof(ReserveInventoryActivity),
                new InventoryRequest(RequestId: orderId, order.Name, order.Quantity));
            //...
            
            await context.CallActivityAsync(
                nameof(ProcessPaymentActivity),
                new PaymentRequest(RequestId: orderId, order.TotalCost, "USD"));

            await context.CallActivityAsync(
                nameof(NotifyActivity),
                new Notification($"Order {orderId} processed successfully!"));

            // End the workflow with a success result
            return new OrderResult(Processed: true);
        }
    }

See the full workflow example in OrderProcessingWorkflow.cs.

Next, register the workflow with the WorkflowRuntimeBuilder and start the workflow runtime.

public class DemoWorkflowWorker {

  public static void main(String[] args) throws Exception {

    // Register the Workflow with the builder.
    WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(DemoWorkflow.class);
    builder.registerActivity(DemoWorkflowActivity.class);

    // Build and then start the workflow runtime pulling and executing tasks
    try (WorkflowRuntime runtime = builder.build()) {
      System.out.println("Start workflow runtime");
      runtime.start();
    }

    System.exit(0);
  }
}

See the Java SDK workflow in context.

Define your workflow function with the parameter ctx *workflow.WorkflowContext and return any and error. Invoke your defined activities from within your workflow.

func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return nil, err
	}
	var output string
	if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
		return nil, err
	}
	if err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output); err != nil {
		return nil, err
	}
	
	if err := ctx.CreateTimer(time.Second).Await(nil); err != nil {
		return nil, nil
	}
	return output, nil
}

See the Go SDK workflow in context.

Write the application

Finally, compose the application using the workflow.

In the following example, for a basic Python hello world application using the Python SDK, your project code would include:

  • A Python package called DaprClient to receive the Python SDK capabilities.
  • A builder with extensions called:
  • API calls. In the example below, these calls start, pause, resume, purge, and completing the workflow.
from datetime import timedelta
from time import sleep
from dapr.ext.workflow import (
    WorkflowRuntime,
    DaprWorkflowContext,
    WorkflowActivityContext,
    RetryPolicy,
    DaprWorkflowClient,
    when_any,
)
from dapr.conf import Settings
from dapr.clients.exceptions import DaprInternalError

settings = Settings()

counter = 0
retry_count = 0
child_orchestrator_count = 0
child_orchestrator_string = ''
child_act_retry_count = 0
instance_id = 'exampleInstanceID'
child_instance_id = 'childInstanceID'
workflow_name = 'hello_world_wf'
child_workflow_name = 'child_wf'
input_data = 'Hi Counter!'
event_name = 'event1'
event_data = 'eventData'
non_existent_id_error = 'no such instance exists'

retry_policy = RetryPolicy(
    first_retry_interval=timedelta(seconds=1),
    max_number_of_attempts=3,
    backoff_coefficient=2,
    max_retry_interval=timedelta(seconds=10),
    retry_timeout=timedelta(seconds=100),
)

wfr = WorkflowRuntime()


@wfr.workflow(name='hello_world_wf')
def hello_world_wf(ctx: DaprWorkflowContext, wf_input):
    print(f'{wf_input}')
    yield ctx.call_activity(hello_act, input=1)
    yield ctx.call_activity(hello_act, input=10)
    yield ctx.call_activity(hello_retryable_act, retry_policy=retry_policy)
    yield ctx.call_child_workflow(child_retryable_wf, retry_policy=retry_policy)

    # Change in event handling: Use when_any to handle both event and timeout
    event = ctx.wait_for_external_event(event_name)
    timeout = ctx.create_timer(timedelta(seconds=30))
    winner = yield when_any([event, timeout])

    if winner == timeout:
        print('Workflow timed out waiting for event')
        return 'Timeout'

    yield ctx.call_activity(hello_act, input=100)
    yield ctx.call_activity(hello_act, input=1000)
    return 'Completed'


@wfr.activity(name='hello_act')
def hello_act(ctx: WorkflowActivityContext, wf_input):
    global counter
    counter += wf_input
    print(f'New counter value is: {counter}!', flush=True)


@wfr.activity(name='hello_retryable_act')
def hello_retryable_act(ctx: WorkflowActivityContext):
    global retry_count
    if (retry_count % 2) == 0:
        print(f'Retry count value is: {retry_count}!', flush=True)
        retry_count += 1
        raise ValueError('Retryable Error')
    print(f'Retry count value is: {retry_count}! This print statement verifies retry', flush=True)
    retry_count += 1


@wfr.workflow(name='child_retryable_wf')
def child_retryable_wf(ctx: DaprWorkflowContext):
    global child_orchestrator_string, child_orchestrator_count
    if not ctx.is_replaying:
        child_orchestrator_count += 1
        print(f'Appending {child_orchestrator_count} to child_orchestrator_string!', flush=True)
        child_orchestrator_string += str(child_orchestrator_count)
    yield ctx.call_activity(
        act_for_child_wf, input=child_orchestrator_count, retry_policy=retry_policy
    )
    if child_orchestrator_count < 3:
        raise ValueError('Retryable Error')


@wfr.activity(name='act_for_child_wf')
def act_for_child_wf(ctx: WorkflowActivityContext, inp):
    global child_orchestrator_string, child_act_retry_count
    inp_char = chr(96 + inp)
    print(f'Appending {inp_char} to child_orchestrator_string!', flush=True)
    child_orchestrator_string += inp_char
    if child_act_retry_count % 2 == 0:
        child_act_retry_count += 1
        raise ValueError('Retryable Error')
    child_act_retry_count += 1


def main():
    wfr.start()
    wf_client = DaprWorkflowClient()

    print('==========Start Counter Increase as per Input:==========')
    wf_client.schedule_new_workflow(
        workflow=hello_world_wf, input=input_data, instance_id=instance_id
    )

    wf_client.wait_for_workflow_start(instance_id)

    # Sleep to let the workflow run initial activities
    sleep(12)

    assert counter == 11
    assert retry_count == 2
    assert child_orchestrator_string == '1aa2bb3cc'

    # Pause Test
    wf_client.pause_workflow(instance_id=instance_id)
    metadata = wf_client.get_workflow_state(instance_id=instance_id)
    print(f'Get response from {workflow_name} after pause call: {metadata.runtime_status.name}')

    # Resume Test
    wf_client.resume_workflow(instance_id=instance_id)
    metadata = wf_client.get_workflow_state(instance_id=instance_id)
    print(f'Get response from {workflow_name} after resume call: {metadata.runtime_status.name}')

    sleep(2)  # Give the workflow time to reach the event wait state
    wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data)

    print('========= Waiting for Workflow completion', flush=True)
    try:
        state = wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
        if state.runtime_status.name == 'COMPLETED':
            print('Workflow completed! Result: {}'.format(state.serialized_output.strip('"')))
        else:
            print(f'Workflow failed! Status: {state.runtime_status.name}')
    except TimeoutError:
        print('*** Workflow timed out!')

    wf_client.purge_workflow(instance_id=instance_id)
    try:
        wf_client.get_workflow_state(instance_id=instance_id)
    except DaprInternalError as err:
        if non_existent_id_error in err._message:
            print('Instance Successfully Purged')

    wfr.shutdown()


if __name__ == '__main__':
    main()

The following example is a basic JavaScript application using the JavaScript SDK. As in this example, your project code would include:

  • A builder with extensions called:
  • API calls. In the example below, these calls start, terminate, get status, pause, resume, raise event, and purge the workflow.
import { TaskHubGrpcClient } from "@microsoft/durabletask-js";
import { WorkflowState } from "./WorkflowState";
import { generateApiTokenClientInterceptors, generateEndpoint, getDaprApiToken } from "../internal/index";
import { TWorkflow } from "../../types/workflow/Workflow.type";
import { getFunctionName } from "../internal";
import { WorkflowClientOptions } from "../../types/workflow/WorkflowClientOption";

/** DaprWorkflowClient class defines client operations for managing workflow instances. */

export default class DaprWorkflowClient {
  private readonly _innerClient: TaskHubGrpcClient;

  /** Initialize a new instance of the DaprWorkflowClient.
   */
  constructor(options: Partial<WorkflowClientOptions> = {}) {
    const grpcEndpoint = generateEndpoint(options);
    options.daprApiToken = getDaprApiToken(options);
    this._innerClient = this.buildInnerClient(grpcEndpoint.endpoint, options);
  }

  private buildInnerClient(hostAddress: string, options: Partial<WorkflowClientOptions>): TaskHubGrpcClient {
    let innerOptions = options?.grpcOptions;
    if (options.daprApiToken !== undefined && options.daprApiToken !== "") {
      innerOptions = {
        ...innerOptions,
        interceptors: [generateApiTokenClientInterceptors(options), ...(innerOptions?.interceptors ?? [])],
      };
    }
    return new TaskHubGrpcClient(hostAddress, innerOptions);
  }

  /**
   * Schedule a new workflow using the DurableTask client.
   */
  public async scheduleNewWorkflow(
    workflow: TWorkflow | string,
    input?: any,
    instanceId?: string,
    startAt?: Date,
  ): Promise<string> {
    if (typeof workflow === "string") {
      return await this._innerClient.scheduleNewOrchestration(workflow, input, instanceId, startAt);
    }
    return await this._innerClient.scheduleNewOrchestration(getFunctionName(workflow), input, instanceId, startAt);
  }

  /**
   * Terminate the workflow associated with the provided instance id.
   *
   * @param {string} workflowInstanceId - Workflow instance id to terminate.
   * @param {any} output - The optional output to set for the terminated workflow instance.
   */
  public async terminateWorkflow(workflowInstanceId: string, output: any) {
    await this._innerClient.terminateOrchestration(workflowInstanceId, output);
  }

  /**
   * Fetch workflow instance metadata from the configured durable store.
   */
  public async getWorkflowState(
    workflowInstanceId: string,
    getInputsAndOutputs: boolean,
  ): Promise<WorkflowState | undefined> {
    const state = await this._innerClient.getOrchestrationState(workflowInstanceId, getInputsAndOutputs);
    if (state !== undefined) {
      return new WorkflowState(state);
    }
  }

  /**
   * Waits for a workflow to start running
   */
  public async waitForWorkflowStart(
    workflowInstanceId: string,
    fetchPayloads = true,
    timeoutInSeconds = 60,
  ): Promise<WorkflowState | undefined> {
    const state = await this._innerClient.waitForOrchestrationStart(
      workflowInstanceId,
      fetchPayloads,
      timeoutInSeconds,
    );
    if (state !== undefined) {
      return new WorkflowState(state);
    }
  }

  /**
   * Waits for a workflow to complete running
   */
  public async waitForWorkflowCompletion(
    workflowInstanceId: string,
    fetchPayloads = true,
    timeoutInSeconds = 60,
  ): Promise<WorkflowState | undefined> {
    const state = await this._innerClient.waitForOrchestrationCompletion(
      workflowInstanceId,
      fetchPayloads,
      timeoutInSeconds,
    );
    if (state != undefined) {
      return new WorkflowState(state);
    }
  }

  /**
   * Sends an event notification message to an awaiting workflow instance
   */
  public async raiseEvent(workflowInstanceId: string, eventName: string, eventPayload?: any) {
    this._innerClient.raiseOrchestrationEvent(workflowInstanceId, eventName, eventPayload);
  }

  /**
   * Purges the workflow instance state from the workflow state store.
   */
  public async purgeWorkflow(workflowInstanceId: string): Promise<boolean> {
    const purgeResult = await this._innerClient.purgeOrchestration(workflowInstanceId);
    if (purgeResult !== undefined) {
      return purgeResult.deletedInstanceCount > 0;
    }
    return false;
  }

  /**
   * Closes the inner DurableTask client and shutdown the GRPC channel.
   */
  public async stop() {
    await this._innerClient.stop();
  }
}

In the following Program.cs example, for a basic ASP.NET order processing application using the .NET SDK, your project code would include:

  • A NuGet package called Dapr.Workflow to receive the .NET SDK capabilities
  • A builder with an extension method called AddDaprWorkflow
    • This will allow you to register workflows and workflow activities (tasks that workflows can schedule)
  • HTTP API calls
    • One for submitting a new order
    • One for checking the status of an existing order
using Dapr.Workflow;
//...

// Dapr Workflows are registered as part of the service configuration
builder.Services.AddDaprWorkflow(options =>
{
    // Note that it's also possible to register a lambda function as the workflow
    // or activity implementation instead of a class.
    options.RegisterWorkflow<OrderProcessingWorkflow>();

    // These are the activities that get invoked by the workflow(s).
    options.RegisterActivity<NotifyActivity>();
    options.RegisterActivity<ReserveInventoryActivity>();
    options.RegisterActivity<ProcessPaymentActivity>();
});

WebApplication app = builder.Build();

// POST starts new order workflow instance
app.MapPost("/orders", async (DaprWorkflowClient client, [FromBody] OrderPayload orderInfo) =>
{
    if (orderInfo?.Name == null)
    {
        return Results.BadRequest(new
        {
            message = "Order data was missing from the request",
            example = new OrderPayload("Paperclips", 99.95),
        });
    }

//...
});

// GET fetches state for order workflow to report status
app.MapGet("/orders/{orderId}", async (string orderId, DaprWorkflowClient client) =>
{
    WorkflowState state = await client.GetWorkflowStateAsync(orderId, true);
    if (!state.Exists)
    {
        return Results.NotFound($"No order with ID = '{orderId}' was found.");
    }

    var httpResponsePayload = new
    {
        details = state.ReadInputAs<OrderPayload>(),
        status = state.RuntimeStatus.ToString(),
        result = state.ReadOutputAs<OrderResult>(),
    };

//...
}).WithName("GetOrderInfoEndpoint");

app.Run();

As in the following example, a hello-world application using the Java SDK and Dapr Workflow would include:

  • A Java package called io.dapr.workflows.client to receive the Java SDK client capabilities.
  • An import of io.dapr.workflows.Workflow
  • The DemoWorkflow class which extends Workflow
  • Creating the workflow with input and output.
  • API calls. In the example below, these calls start and call the workflow activities.
package io.dapr.examples.workflows;

import com.microsoft.durabletask.CompositeTaskFailedException;
import com.microsoft.durabletask.Task;
import com.microsoft.durabletask.TaskCanceledException;
import io.dapr.workflows.Workflow;
import io.dapr.workflows.WorkflowStub;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;

/**
 * Implementation of the DemoWorkflow for the server side.
 */
public class DemoWorkflow extends Workflow {
  @Override
  public WorkflowStub create() {
    return ctx -> {
      ctx.getLogger().info("Starting Workflow: " + ctx.getName());
      // ...
      ctx.getLogger().info("Calling Activity...");
      var input = new DemoActivityInput("Hello Activity!");
      var output = ctx.callActivity(DemoWorkflowActivity.class.getName(), input, DemoActivityOutput.class).await();
      // ...
    };
  }
}

See the full Java SDK workflow example in context.

As in the following example, a hello-world application using the Go SDK and Dapr Workflow would include:

  • A Go package called client to receive the Go SDK client capabilities.
  • The TestWorkflow method
  • Creating the workflow with input and output.
  • API calls. In the example below, these calls start and call the workflow activities.
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/dapr/durabletask-go/api"
	"github.com/dapr/durabletask-go/backend"
	"github.com/dapr/durabletask-go/client"
	"github.com/dapr/durabletask-go/task"
	dapr "github.com/dapr/go-sdk/client"
)

var stage = 0

const (
	workflowComponent = "dapr"
)

func main() {
	registry := task.NewTaskRegistry()

	if err := registry.AddOrchestrator(TestWorkflow); err != nil {
		log.Fatal(err)
	}
	fmt.Println("TestWorkflow registered")

	if err := registry.AddActivity(TestActivity); err != nil {
		log.Fatal(err)
	}
	fmt.Println("TestActivity registered")

	daprClient, err := dapr.NewClient()
	if err != nil {
		log.Fatalf("failed to create Dapr client: %v", err)
	}

	client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
	if err := client.StartWorkItemListener(context.TODO(), registry); err != nil {
		log.Fatalf("failed to start work item listener: %v", err)
	}

	fmt.Println("runner started")

	ctx := context.Background()

	// Start workflow test
	id, err := client.ScheduleNewOrchestration(ctx, "TestWorkflow", api.WithInput(1))
	if err != nil {
		log.Fatalf("failed to start workflow: %v", err)
	}
	fmt.Printf("workflow started with id: %v\n", id)

	// Pause workflow test
	err = client.PurgeOrchestrationState(ctx, id)
	if err != nil {
		log.Fatalf("failed to pause workflow: %v", err)
	}

	respGet, err := client.FetchOrchestrationMetadata(ctx, id)
	if err != nil {
		log.Fatalf("failed to get workflow: %v", err)
	}
	fmt.Printf("workflow paused: %s\n", respGet.RuntimeStatus)

	// Resume workflow test
	err = client.ResumeOrchestration(ctx, id, "")
	if err != nil {
		log.Fatalf("failed to resume workflow: %v", err)
	}
	fmt.Printf("workflow running: %s\n", respGet.RuntimeStatus)

	respGet, err = client.FetchOrchestrationMetadata(ctx, id)
	if err != nil {
		log.Fatalf("failed to get workflow: %v", err)
	}
	fmt.Printf("workflow resumed: %s\n", respGet.RuntimeStatus)

	fmt.Printf("stage: %d\n", stage)

	// Raise Event Test
	err = client.RaiseEvent(ctx, id, "testEvent", api.WithEventPayload("testData"))
	if err != nil {
		fmt.Printf("failed to raise event: %v", err)
	}

	fmt.Println("workflow event raised")

	time.Sleep(time.Second) // allow workflow to advance

	fmt.Printf("stage: %d\n", stage)

	respGet, err = client.FetchOrchestrationMetadata(ctx, id)
	if err != nil {
		log.Fatalf("failed to get workflow: %v", err)
	}

	fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)

	// Purge workflow test
	err = client.PurgeOrchestrationState(ctx, id)
	if err != nil {
		log.Fatalf("failed to purge workflow: %v", err)
	}
	fmt.Println("workflow purged")
}

func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return nil, err
	}
	var output string
	if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
		return nil, err
	}

	err := ctx.WaitForSingleEvent("testEvent", time.Second*60).Await(&output)
	if err != nil {
		return nil, err
	}

	if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
		return nil, err
	}

	return output, nil
}

func TestActivity(ctx task.ActivityContext) (any, error) {
	var input int
	if err := ctx.GetInput(&input); err != nil {
		return "", err
	}

	stage += input

	return fmt.Sprintf("Stage: %d", stage), nil
}

See the full Go SDK workflow example in context.

Next steps

Now that you’ve authored a workflow, learn how to manage it.

Manage workflows >>

6 - How to: Manage workflows

Manage and run workflows

Now that you’ve authored the workflow and its activities in your application, you can start, terminate, and get information about the workflow using HTTP API calls. For more information, read the workflow API reference.

Manage your workflow within your code. In the workflow example from the Author a workflow guide, the workflow is registered in the code using the following APIs:

  • schedule_new_workflow: Start an instance of a workflow
  • get_workflow_state: Get information on the status of the workflow
  • pause_workflow: Pauses or suspends a workflow instance that can later be resumed
  • resume_workflow: Resumes a paused workflow instance
  • raise_workflow_event: Raise an event on a workflow
  • purge_workflow: Removes all metadata related to a specific workflow instance
  • wait_for_workflow_completion: Complete a particular instance of a workflow
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext
from dapr.clients import DaprClient

# Sane parameters
instanceId = "exampleInstanceID"
workflowComponent = "dapr"
workflowName = "hello_world_wf"
eventName = "event1"
eventData = "eventData"

# Start the workflow
wf_client.schedule_new_workflow(
        workflow=hello_world_wf, input=input_data, instance_id=instance_id
    )

# Get info on the workflow
wf_client.get_workflow_state(instance_id=instance_id)

# Pause the workflow
wf_client.pause_workflow(instance_id=instance_id)
    metadata = wf_client.get_workflow_state(instance_id=instance_id)

# Resume the workflow
wf_client.resume_workflow(instance_id=instance_id)

# Raise an event on the workflow. 
wf_client.raise_workflow_event(instance_id=instance_id, event_name=event_name, data=event_data)

# Purge the workflow
wf_client.purge_workflow(instance_id=instance_id)

# Wait for workflow completion
wf_client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)

Manage your workflow within your code. In the workflow example from the Author a workflow guide, the workflow is registered in the code using the following APIs:

  • client.workflow.start: Start an instance of a workflow
  • client.workflow.get: Get information on the status of the workflow
  • client.workflow.pause: Pauses or suspends a workflow instance that can later be resumed
  • client.workflow.resume: Resumes a paused workflow instance
  • client.workflow.purge: Removes all metadata related to a specific workflow instance
  • client.workflow.terminate: Terminate or stop a particular instance of a workflow
import { DaprClient } from "@dapr/dapr";

async function printWorkflowStatus(client: DaprClient, instanceId: string) {
  const workflow = await client.workflow.get(instanceId);
  console.log(
    `Workflow ${workflow.workflowName}, created at ${workflow.createdAt.toUTCString()}, has status ${
      workflow.runtimeStatus
    }`,
  );
  console.log(`Additional properties: ${JSON.stringify(workflow.properties)}`);
  console.log("--------------------------------------------------\n\n");
}

async function start() {
  const client = new DaprClient();

  // Start a new workflow instance
  const instanceId = await client.workflow.start("OrderProcessingWorkflow", {
    Name: "Paperclips",
    TotalCost: 99.95,
    Quantity: 4,
  });
  console.log(`Started workflow instance ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // Pause a workflow instance
  await client.workflow.pause(instanceId);
  console.log(`Paused workflow instance ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // Resume a workflow instance
  await client.workflow.resume(instanceId);
  console.log(`Resumed workflow instance ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // Terminate a workflow instance
  await client.workflow.terminate(instanceId);
  console.log(`Terminated workflow instance ${instanceId}`);
  await printWorkflowStatus(client, instanceId);

  // Wait for the workflow to complete, 30 seconds!
  await new Promise((resolve) => setTimeout(resolve, 30000));
  await printWorkflowStatus(client, instanceId);

  // Purge a workflow instance
  await client.workflow.purge(instanceId);
  console.log(`Purged workflow instance ${instanceId}`);
  // This will throw an error because the workflow instance no longer exists.
  await printWorkflowStatus(client, instanceId);
}

start().catch((e) => {
  console.error(e);
  process.exit(1);
});

Manage your workflow within your code. In the OrderProcessingWorkflow example from the Author a workflow guide, the workflow is registered in the code. You can now start, terminate, and get information about a running workflow:

string orderId = "exampleOrderId";
OrderPayload input = new OrderPayload("Paperclips", 99.95);
Dictionary<string, string> workflowOptions; // This is an optional parameter

// Start the workflow using the orderId as our workflow ID. This returns a string containing the instance ID for the particular workflow instance, whether we provide it ourselves or not.
await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(OrderProcessingWorkflow), orderId, input, workflowOptions);

// Get information on the workflow. This response contains information such as the status of the workflow, when it started, and more!
WorkflowState currentState = await daprWorkflowClient.GetWorkflowStateAsync(orderId, orderId);

// Terminate the workflow
await daprWorkflowClient.TerminateWorkflowAsync(orderId);

// Raise an event (an incoming purchase order) that your workflow will wait for
await daprWorkflowClient.RaiseEventAsync(orderId, "incoming-purchase-order", input);

// Pause
await daprWorkflowClient.SuspendWorkflowAsync(orderId);

// Resume
await daprWorkflowClient.ResumeWorkflowAsync(orderId);

// Purge the workflow, removing all inbox and history information from associated instance
await daprWorkflowClient.PurgeInstanceAsync(orderId);

Manage your workflow within your code. In the workflow example from the Java SDK, the workflow is registered in the code using the following APIs:

  • scheduleNewWorkflow: Starts a new workflow instance
  • getInstanceState: Get information on the status of the workflow
  • waitForInstanceStart: Pauses or suspends a workflow instance that can later be resumed
  • raiseEvent: Raises events/tasks for the running workflow instance
  • waitForInstanceCompletion: Waits for the workflow to complete its tasks
  • purgeInstance: Removes all metadata related to a specific workflow instance
  • terminateWorkflow: Terminates the workflow
  • purgeInstance: Removes all metadata related to a specific workflow
package io.dapr.examples.workflows;

import io.dapr.workflows.client.DaprWorkflowClient;
import io.dapr.workflows.client.WorkflowInstanceStatus;

// ...
public class DemoWorkflowClient {

  // ...
  public static void main(String[] args) throws InterruptedException {
    DaprWorkflowClient client = new DaprWorkflowClient();

    try (client) {
      // Start a workflow
      String instanceId = client.scheduleNewWorkflow(DemoWorkflow.class, "input data");
      
      // Get status information on the workflow
      WorkflowInstanceStatus workflowMetadata = client.getInstanceState(instanceId, true);

      // Wait or pause for the workflow instance start
      try {
        WorkflowInstanceStatus waitForInstanceStartResult =
            client.waitForInstanceStart(instanceId, Duration.ofSeconds(60), true);
      }

      // Raise an event for the workflow; you can raise several events in parallel
      client.raiseEvent(instanceId, "TestEvent", "TestEventPayload");
      client.raiseEvent(instanceId, "event1", "TestEvent 1 Payload");
      client.raiseEvent(instanceId, "event2", "TestEvent 2 Payload");
      client.raiseEvent(instanceId, "event3", "TestEvent 3 Payload");

      // Wait for workflow to complete running through tasks
      try {
        WorkflowInstanceStatus waitForInstanceCompletionResult =
            client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(60), true);
      } 

      // Purge the workflow instance, removing all metadata associated with it
      boolean purgeResult = client.purgeInstance(instanceId);

      // Terminate the workflow instance
      client.terminateWorkflow(instanceToTerminateId, null);

    System.exit(0);
  }
}

Manage your workflow within your code. In the workflow example from the Go SDK, the workflow is registered in the code using the following APIs:

  • StartWorkflow: Starts a new workflow instance
  • GetWorkflow: Get information on the status of the workflow
  • PauseWorkflow: Pauses or suspends a workflow instance that can later be resumed
  • RaiseEventWorkflow: Raises events/tasks for the running workflow instance
  • ResumeWorkflow: Waits for the workflow to complete its tasks
  • PurgeWorkflow: Removes all metadata related to a specific workflow instance
  • TerminateWorkflow: Terminates the workflow
// Start workflow
type StartWorkflowRequest struct {
	InstanceID        string // Optional instance identifier
	WorkflowComponent string
	WorkflowName      string
	Options           map[string]string // Optional metadata
	Input             any               // Optional input
	SendRawInput      bool              // Set to True in order to disable serialization on the input
}

type StartWorkflowResponse struct {
	InstanceID string
}

// Get the workflow status
type GetWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

type GetWorkflowResponse struct {
	InstanceID    string
	WorkflowName  string
	CreatedAt     time.Time
	LastUpdatedAt time.Time
	RuntimeStatus string
	Properties    map[string]string
}

// Purge workflow
type PurgeWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// Terminate workflow
type TerminateWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// Pause workflow
type PauseWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// Resume workflow
type ResumeWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
}

// Raise an event for the running workflow
type RaiseEventWorkflowRequest struct {
	InstanceID        string
	WorkflowComponent string
	EventName         string
	EventData         any
	SendRawData       bool // Set to True in order to disable serialization on the data
}

Manage your workflow using HTTP calls. The example below plugs in the properties from the Author a workflow example with a random instance ID number.

Start workflow

To start your workflow with an ID 12345678, run:

curl -X POST "http://localhost:3500/v1.0/workflows/dapr/OrderProcessingWorkflow/start?instanceID=12345678"

Note that workflow instance IDs can only contain alphanumeric characters, underscores, and dashes.

Terminate workflow

To terminate your workflow with an ID 12345678, run:

curl -X POST "http://localhost:3500/v1.0/workflows/dapr/12345678/terminate"

Raise an event

For workflow components that support subscribing to external events, such as the Dapr Workflow engine, you can use the following “raise event” API to deliver a named event to a specific workflow instance.

curl -X POST "http://localhost:3500/v1.0/workflows/<workflowComponentName>/<instanceID>/raiseEvent/<eventName>"

An eventName can be any function.

Pause or resume a workflow

To plan for down-time, wait for inputs, and more, you can pause and then resume a workflow. To pause a workflow with an ID 12345678 until triggered to resume, run:

curl -X POST "http://localhost:3500/v1.0/workflows/dapr/12345678/pause"

To resume a workflow with an ID 12345678, run:

curl -X POST "http://localhost:3500/v1.0/workflows/dapr/12345678/resume"

Purge a workflow

The purge API can be used to permanently delete workflow metadata from the underlying state store, including any stored inputs, outputs, and workflow history records. This is often useful for implementing data retention policies and for freeing resources.

Only workflow instances in the COMPLETED, FAILED, or TERMINATED state can be purged. If the workflow is in any other state, calling purge returns an error.

curl -X POST "http://localhost:3500/v1.0/workflows/dapr/12345678/purge"

Get information about a workflow

To fetch workflow information (outputs and inputs) with an ID 12345678, run:

curl -X GET "http://localhost:3500/v1.0/workflows/dapr/12345678"

Learn more about these HTTP calls in the workflow API reference guide.

Next steps

Now that you’ve learned how to manage workflows, learn how to execute workflows across multiple applications

Multi Application Workflows>>

7 - Multi Application Workflows

Executing workflows across multiple applications

It is often the case that a single workflow spans multiple applications, microservices, or programing languages. This is where an activity or a child workflow will be executed on a different application than the one hosting the parent workflow. Some scenarios where this is useful include:

  • A Machine Learning (ML) training activity must be executed on GPU-enabled machines, while the rest of the workflow runs on CPU-only orchestration machines.
  • Activities need access to sensitive data or credentials that are only available to particular identities or locales.
  • Different parts of the workflow need to be executed in different trust zones or networks.
  • Different parts of the workflow need to be executed in different geographic regions due to data residency requirements.
  • An involved business process spans multiple teams or departments, each owning their own application.
  • Implementation of a workflow spans different programming lanaguages based on team expertise or existing codebases.

Multi-application workflows

Like all building blocks in Dapr, workflow execution routing is based on the App ID of the hosting Dapr application. By default, the full workflow execution is hosted on the app ID that started the workflow. This workflow will be executed across all replicas of that app ID, not just the single replica which scheduled the workflow.

It is possible to execute activities or child workflows on different app IDs by specifying the target app ID parameter, inside the workflow execution code. Upon execution, the target app ID will execute the activity or child workflow, and return the result to the parent workflow of the originating app ID. Workflows being durable, if the target activity or child workflow app ID is not available or has not been defined, the parent workflow retry until the target app ID becomes available, indefinitely. It is paramount that their is co-ordination between the teams owning the different app IDs to ensure that the activities and child workflows are defined and available when needed.

The entire Workflow execution may be distributed across multiple app IDs with no limit, with each activity or child workflow specifying the target app ID. The final history of the workflow will be saved by the app ID that hosts the very parent (or can consider it the root) workflow.

Multi-application activity examples

The following examples show how to execute activities on different target app IDs.

package main

import (
	"context"
	"log"

	"github.com/dapr/durabletask-go/backend"
	"github.com/dapr/durabletask-go/client"
	"github.com/dapr/durabletask-go/task"
	dapr "github.com/dapr/go-sdk/client"
)

func main() {
	ctx := context.Background()

	registry := task.NewTaskRegistry()
	if err := registry.AddOrchestrator(TestWorkflow); err != nil {
		log.Fatal(err)
	}

	daprClient, err := dapr.NewClient()
	if err != nil {
		log.Fatal(err)
	}

	client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
	if err := client.StartWorkItemListener(ctx, registry); err != nil {
		log.Fatal(err)
	}

	id, err := client.ScheduleNewOrchestration(ctx, "TestWorkflow")
	if err != nil {
		log.Fatal(err)
	}

	if _, err = client.WaitForOrchestrationCompletion(ctx, id); err != nil {
		log.Fatal(err)
	}
}

func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
	var output string
	err := ctx.CallActivity("my-other-activity",
		task.WithActivityInput("my-input"),
		// Here we set custom target app ID which will execute this activity.
		task.WithActivityAppID("my-other-app-id"),
	).Await(&output)
	if err != nil {
		return nil, err
	}

	return output, nil
}
public class CrossAppWorkflow implements Workflow {
  @Override
  public WorkflowStub create() {
      return ctx -> {
          var logger = ctx.getLogger();
          logger.info("=== WORKFLOW STARTING ===");
          logger.info("Starting CrossAppWorkflow: {}", ctx.getName());
          logger.info("Workflow name: {}", ctx.getName());
          logger.info("Workflow instance ID: {}", ctx.getInstanceId());

          String input = ctx.getInput(String.class);
          logger.info("CrossAppWorkflow received input: {}", input);
          logger.info("Workflow input: {}", input);

          // Call an activity in another app by passing in an active appID to the WorkflowTaskOptions
          logger.info("Calling cross-app activity in 'app2'...");
          logger.info("About to call cross-app activity in app2...");
          String crossAppResult = ctx.callActivity(
                  App2TransformActivity.class.getName(),
                  input,
                  new WorkflowTaskOptions("app2"),
                  String.class
          ).await();

          // Call another activity in a different app
          logger.info("Calling cross-app activity in 'app3'...");
          logger.info("About to call cross-app activity in app3...");
          String finalResult = ctx.callActivity(
                  App3FinalizeActivity.class.getName(),
                  crossAppResult,
                  new WorkflowTaskOptions("app3"),
                  String.class
          ).await();
          logger.info("Final cross-app activity result: {}", finalResult);
          logger.info("Final cross-app activity result: {}", finalResult);

          logger.info("CrossAppWorkflow finished with: {}", finalResult);
          logger.info("=== WORKFLOW COMPLETING WITH: {} ===" , finalResult);
          ctx.complete(finalResult);
      };
  }
}

The following examples show how to execute child workflows on different target app IDs.

package main

import (
	"context"
	"log"

	"github.com/dapr/durabletask-go/backend"
	"github.com/dapr/durabletask-go/client"
	"github.com/dapr/durabletask-go/task"
	dapr "github.com/dapr/go-sdk/client"
)

func main() {
	ctx := context.Background()

	registry := task.NewTaskRegistry()
	if err := registry.AddOrchestrator(TestWorkflow); err != nil {
		log.Fatal(err)
	}

	daprClient, err := dapr.NewClient()
	if err != nil {
		log.Fatal(err)
	}

	client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
	if err := client.StartWorkItemListener(ctx, registry); err != nil {
		log.Fatal(err)
	}

	id, err := client.ScheduleNewOrchestration(ctx, "TestWorkflow")
	if err != nil {
		log.Fatal(err)
	}

	if _, err = client.WaitForOrchestrationCompletion(ctx, id); err != nil {
		log.Fatal(err)
	}
}

func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
	var output string
	err := ctx.CallSubOrchestrator("my-sub-orchestration",
		task.WithSubOrchestratorInput("my-input"),
		// Here we set custom target app ID which will execute this child workflow.
		task.WithSubOrchestratorAppID("my-sub-app-id"),
	).Await(&output)
	if err != nil {
		return nil, err
	}

	return output, nil
}