Workflows
TnsAI.Coordination provides two workflow executors: `DAGExecutor` for parallel dependency-graph execution and `SagaExecutor` for sequential execution with compensation (rollback) on failure.
WorkflowStep
A workflow step represents one unit of work -- a single agent doing a single task. Steps can depend on other steps, have timeouts, and include compensation actions for rollback. The step is defined as an immutable record.
public record WorkflowStep(
String id,
String name,
Agent agent,
List<String> dependencies,
Optional<Predicate<WorkflowContext>> condition,
Optional<WorkflowStep> compensation,
long timeoutMs
)Creating Steps
Steps are created using factory methods and can be customized with dependencies, compensation actions, conditions, and timeouts.
// Simple step (no dependencies)
WorkflowStep fetch = WorkflowStep.of("fetch-data", fetchAgent);
// Step with dependencies
WorkflowStep merge = WorkflowStep.of("merge", mergeAgent, List.of("fetch-a", "fetch-b"));
// Step with compensation (for saga pattern)
WorkflowStep book = WorkflowStep.of("book-flight", bookAgent)
.withCompensation(WorkflowStep.of("cancel-flight", cancelAgent));
// Conditional step (only runs if predicate is true)
WorkflowStep notify = WorkflowStep.of("notify", notifyAgent)
.withCondition(ctx -> ctx.has("step.merge.output"));
// Step with timeout
WorkflowStep slow = WorkflowStep.of("external-call", apiAgent)
.withTimeout(30_000); // 30 secondsisRoot() returns true when the step has no dependencies.
WorkflowContext
The workflow context is a shared data store that all steps in a workflow can read from and write to. It holds initial parameters, intermediate results, and step outputs. It is thread-safe (uses ConcurrentHashMap internally) so multiple DAG steps running in parallel can access it safely.
WorkflowContext context = new WorkflowContext("workflow-123");
// Store/retrieve arbitrary data
context.put("inputFile", "/data/report.csv");
String file = context.get("inputFile");
String fallback = context.getOrDefault("missing", "default-value");
boolean exists = context.has("inputFile");
// Step results are recorded automatically
context.recordStepResult("fetch", StepResult.success("fetch", "data...", 250));
WorkflowContext.StepResult result = context.getStepResult("fetch");
Map<String, WorkflowContext.StepResult> all = context.getAllStepResults();You can pass initial data at construction:
WorkflowContext context = new WorkflowContext("wf-1", Map.of(
"region", "us-east-1",
"maxRetries", 3
));StepResult
Each completed step produces a StepResult that records whether it succeeded or failed, the output or error message, and how long it took.
public record StepResult(String stepId, boolean success, String output, long durationMs, String error) {
static StepResult success(String stepId, String output, long durationMs);
static StepResult failure(String stepId, String error, long durationMs);
}WorkflowResult
After a workflow finishes (whether it succeeds or fails), you get a WorkflowResult containing the overall success status, all step results, total duration, and any error message.
public record WorkflowResult(
String workflowId,
boolean success,
List<StepResult> stepResults,
long totalDurationMs,
String error
) {
long completedCount(); // number of successful steps
long failedCount(); // number of failed steps
}DAGExecutor
The DAG (Directed Acyclic Graph) executor is for workflows where some steps can run in parallel because they don't depend on each other. You declare each step's dependencies, and the executor automatically figures out what can run concurrently and what must wait. Steps with no dependencies run in parallel; dependent steps wait for all prerequisites to complete.
WorkflowResult result = DAGExecutor.builder()
.step("fetch-a", fetchAgentA)
.step("fetch-b", fetchAgentB)
.step("merge", mergeAgent, List.of("fetch-a", "fetch-b"))
.step("report", reportAgent, List.of("merge"))
.timeout(Duration.ofMinutes(10))
.build()
.execute(new WorkflowContext("dag-1"));How It Works
Here is what happens internally when the DAG executor runs your workflow:
- Validation: On
build(), the DAG is validated -- all dependency references must exist, and cycle detection runs via topological sort (Kahn's algorithm). - Topological Sort: Steps are ordered so that dependencies execute before dependents.
- Parallel Execution: Root steps (no dependencies) start immediately on a cached thread pool. Each dependent step waits via
CompletableFuture.allOf()on its prerequisites. - Dependency Failure: If a dependency fails, the dependent step is marked as failed without execution.
- Conditional Steps: Steps with a
conditionpredicate are evaluated before execution. If the condition returnsfalse, the step is marked asSKIPPED. - Context Propagation: Each step receives a prompt that includes outputs from its dependency steps, enabling data flow through the DAG.
Async Execution
If you don't want to block the calling thread, use executeAsync() to get a CompletableFuture that completes when the DAG finishes.
CompletableFuture<WorkflowResult> future = dag.executeAsync(context);
future.thenAccept(result -> {
System.out.println("DAG completed: " + result.success());
});Timeout
You can set both a global timeout for the entire DAG and individual timeouts per step. The overall DAG execution timeout defaults to 5 minutes. Individual steps can also have their own timeout set via withTimeout().
SagaExecutor
The Saga executor is for workflows where you need "all or nothing" semantics. Steps run in sequence, and if any step fails, the executor automatically runs compensation (undo) actions in reverse order for all previously completed steps. This is the saga pattern, commonly used for booking workflows (flights, hotels, cars) where a failure in one step means you need to cancel everything done so far.
WorkflowResult result = SagaExecutor.builder()
.step(WorkflowStep.of("book-flight", bookFlightAgent)
.withCompensation(WorkflowStep.of("cancel-flight", cancelFlightAgent)))
.step(WorkflowStep.of("book-hotel", bookHotelAgent)
.withCompensation(WorkflowStep.of("cancel-hotel", cancelHotelAgent)))
.step(WorkflowStep.of("book-car", bookCarAgent)
.withCompensation(WorkflowStep.of("cancel-car", cancelCarAgent)))
.step(WorkflowStep.of("confirm", confirmAgent))
.build()
.execute(new WorkflowContext("saga-1"));Compensation Behavior
Here is how the saga pattern handles failures and rollbacks:
- Steps execute in order. Each step's output is stored in context as
step.<id>.output. - On failure, compensation runs in reverse order for all completed steps.
- Steps without a compensation action are skipped during rollback.
- Compensation failures are logged but do not stop the compensation chain -- all compensations are attempted.
- Conditional steps (
withCondition()) whose predicate returnsfalseare skipped entirely.
Async Execution
Like the DAG executor, sagas also support non-blocking execution via CompletableFuture.
CompletableFuture<WorkflowResult> future = saga.executeAsync(context);Choosing Between DAG and Saga
Use this table to decide which executor fits your workflow. The key question is whether you need parallel execution or rollback-on-failure semantics.
| Aspect | DAGExecutor | SagaExecutor |
|---|---|---|
| Execution | Parallel where possible | Sequential |
| Failure handling | Partial results returned | Compensation (rollback) |
| Use case | Independent parallel tasks with merge points | Transactions that need undo on failure |
| Step ordering | Dependency graph | Insertion order |
| Timeout | Global + per-step | Per-step only |
Group Topologies
TnsAI.Coordination provides 8 group topologies for structuring multi-agent collaboration. Each topology has a dedicated builder and follows the lifecycle: create -> start -> execute -> stop.
AutoTeamBuilder
TnsAI.Intelligence provides LLM-driven automatic team composition. Given a task description, `AutoTeamBuilder` decomposes it into subtasks, generates agent configurations, and selects the optimal coordination topology. Package: `com.tnsai.autoteam`.