TnsAI
Coordination

Workflows

TnsAI.Coordination provides two workflow executors: `DAGExecutor` for parallel dependency-graph execution and `SagaExecutor` for sequential execution with compensation (rollback) on failure.

WorkflowStep

A workflow step represents one unit of work -- a single agent doing a single task. Steps can depend on other steps, have timeouts, and include compensation actions for rollback. The step is defined as an immutable record.

public record WorkflowStep(
    String id,
    String name,
    Agent agent,
    List<String> dependencies,
    Optional<Predicate<WorkflowContext>> condition,
    Optional<WorkflowStep> compensation,
    long timeoutMs
)

Creating Steps

Steps are created using factory methods and can be customized with dependencies, compensation actions, conditions, and timeouts.

// Simple step (no dependencies)
WorkflowStep fetch = WorkflowStep.of("fetch-data", fetchAgent);

// Step with dependencies
WorkflowStep merge = WorkflowStep.of("merge", mergeAgent, List.of("fetch-a", "fetch-b"));

// Step with compensation (for saga pattern)
WorkflowStep book = WorkflowStep.of("book-flight", bookAgent)
    .withCompensation(WorkflowStep.of("cancel-flight", cancelAgent));

// Conditional step (only runs if predicate is true)
WorkflowStep notify = WorkflowStep.of("notify", notifyAgent)
    .withCondition(ctx -> ctx.has("step.merge.output"));

// Step with timeout
WorkflowStep slow = WorkflowStep.of("external-call", apiAgent)
    .withTimeout(30_000); // 30 seconds

isRoot() returns true when the step has no dependencies.

WorkflowContext

The workflow context is a shared data store that all steps in a workflow can read from and write to. It holds initial parameters, intermediate results, and step outputs. It is thread-safe (uses ConcurrentHashMap internally) so multiple DAG steps running in parallel can access it safely.

WorkflowContext context = new WorkflowContext("workflow-123");

// Store/retrieve arbitrary data
context.put("inputFile", "/data/report.csv");
String file = context.get("inputFile");
String fallback = context.getOrDefault("missing", "default-value");
boolean exists = context.has("inputFile");

// Step results are recorded automatically
context.recordStepResult("fetch", StepResult.success("fetch", "data...", 250));
WorkflowContext.StepResult result = context.getStepResult("fetch");
Map<String, WorkflowContext.StepResult> all = context.getAllStepResults();

You can pass initial data at construction:

WorkflowContext context = new WorkflowContext("wf-1", Map.of(
    "region", "us-east-1",
    "maxRetries", 3
));

StepResult

Each completed step produces a StepResult that records whether it succeeded or failed, the output or error message, and how long it took.

public record StepResult(String stepId, boolean success, String output, long durationMs, String error) {
    static StepResult success(String stepId, String output, long durationMs);
    static StepResult failure(String stepId, String error, long durationMs);
}

WorkflowResult

After a workflow finishes (whether it succeeds or fails), you get a WorkflowResult containing the overall success status, all step results, total duration, and any error message.

public record WorkflowResult(
    String workflowId,
    boolean success,
    List<StepResult> stepResults,
    long totalDurationMs,
    String error
) {
    long completedCount();  // number of successful steps
    long failedCount();     // number of failed steps
}

DAGExecutor

The DAG (Directed Acyclic Graph) executor is for workflows where some steps can run in parallel because they don't depend on each other. You declare each step's dependencies, and the executor automatically figures out what can run concurrently and what must wait. Steps with no dependencies run in parallel; dependent steps wait for all prerequisites to complete.

WorkflowResult result = DAGExecutor.builder()
    .step("fetch-a", fetchAgentA)
    .step("fetch-b", fetchAgentB)
    .step("merge", mergeAgent, List.of("fetch-a", "fetch-b"))
    .step("report", reportAgent, List.of("merge"))
    .timeout(Duration.ofMinutes(10))
    .build()
    .execute(new WorkflowContext("dag-1"));

How It Works

Here is what happens internally when the DAG executor runs your workflow:

  1. Validation: On build(), the DAG is validated -- all dependency references must exist, and cycle detection runs via topological sort (Kahn's algorithm).
  2. Topological Sort: Steps are ordered so that dependencies execute before dependents.
  3. Parallel Execution: Root steps (no dependencies) start immediately on a cached thread pool. Each dependent step waits via CompletableFuture.allOf() on its prerequisites.
  4. Dependency Failure: If a dependency fails, the dependent step is marked as failed without execution.
  5. Conditional Steps: Steps with a condition predicate are evaluated before execution. If the condition returns false, the step is marked as SKIPPED.
  6. Context Propagation: Each step receives a prompt that includes outputs from its dependency steps, enabling data flow through the DAG.

Async Execution

If you don't want to block the calling thread, use executeAsync() to get a CompletableFuture that completes when the DAG finishes.

CompletableFuture<WorkflowResult> future = dag.executeAsync(context);
future.thenAccept(result -> {
    System.out.println("DAG completed: " + result.success());
});

Timeout

You can set both a global timeout for the entire DAG and individual timeouts per step. The overall DAG execution timeout defaults to 5 minutes. Individual steps can also have their own timeout set via withTimeout().

SagaExecutor

The Saga executor is for workflows where you need "all or nothing" semantics. Steps run in sequence, and if any step fails, the executor automatically runs compensation (undo) actions in reverse order for all previously completed steps. This is the saga pattern, commonly used for booking workflows (flights, hotels, cars) where a failure in one step means you need to cancel everything done so far.

WorkflowResult result = SagaExecutor.builder()
    .step(WorkflowStep.of("book-flight", bookFlightAgent)
        .withCompensation(WorkflowStep.of("cancel-flight", cancelFlightAgent)))
    .step(WorkflowStep.of("book-hotel", bookHotelAgent)
        .withCompensation(WorkflowStep.of("cancel-hotel", cancelHotelAgent)))
    .step(WorkflowStep.of("book-car", bookCarAgent)
        .withCompensation(WorkflowStep.of("cancel-car", cancelCarAgent)))
    .step(WorkflowStep.of("confirm", confirmAgent))
    .build()
    .execute(new WorkflowContext("saga-1"));

Compensation Behavior

Here is how the saga pattern handles failures and rollbacks:

  • Steps execute in order. Each step's output is stored in context as step.<id>.output.
  • On failure, compensation runs in reverse order for all completed steps.
  • Steps without a compensation action are skipped during rollback.
  • Compensation failures are logged but do not stop the compensation chain -- all compensations are attempted.
  • Conditional steps (withCondition()) whose predicate returns false are skipped entirely.

Async Execution

Like the DAG executor, sagas also support non-blocking execution via CompletableFuture.

CompletableFuture<WorkflowResult> future = saga.executeAsync(context);

Choosing Between DAG and Saga

Use this table to decide which executor fits your workflow. The key question is whether you need parallel execution or rollback-on-failure semantics.

AspectDAGExecutorSagaExecutor
ExecutionParallel where possibleSequential
Failure handlingPartial results returnedCompensation (rollback)
Use caseIndependent parallel tasks with merge pointsTransactions that need undo on failure
Step orderingDependency graphInsertion order
TimeoutGlobal + per-stepPer-step only

On this page