TnsAI
Coordination

Advanced Patterns

Advanced coordination patterns in TnsAI.Coordination for production multi-agent systems.

ScatterGather

ScatterGather is a utility for fan-out/fan-in communication: send the same task to multiple agents at once and collect their responses. This is useful when you want multiple perspectives on the same question, or when you need redundancy for reliability. Supports several collection strategies (all, first-wins, quorum, consensus, score-based, aggregate).

Scatter (Collect All)

Send the task to all agents and wait for every response. You get back all results along with metrics like success rate and which agent responded fastest.

ScatterGather.ScatterResults results = ScatterGather.scatter(
    "Analyze market trends for Q4",
    List.of(analyst1, analyst2, analyst3),
    30, TimeUnit.SECONDS
);

List<String> responses = results.responseTexts();       // successful response texts
double successRate = results.successRate();              // 0.0 - 1.0
Optional<AgentResponse> fastest = results.fastest();     // quickest responder

Race (First Wins)

When speed matters more than comprehensiveness, race() returns the first successful response and cancels the rest.

Optional<AgentResponse> winner = ScatterGather.race(
    "What is the capital of France?",
    List.of(agent1, agent2, agent3),
    10, TimeUnit.SECONDS
);

Quorum (Wait for N)

When you need a minimum number of confirmations before proceeding (like a distributed consensus check), quorum() waits until at least N agents respond successfully, or timeout.

ScatterResults results = ScatterGather.quorum(
    "Verify transaction hash",
    validators,
    3,                   // need 3 confirmations
    30, TimeUnit.SECONDS
);

Consensus (Majority Vote)

When you want to find the "agreed-upon" answer from multiple agents, gatherWithConsensus() collects all responses and picks the most common answer using word-level Jaccard similarity.

Optional<AgentResponse> consensus = ScatterGather.gatherWithConsensus(
    "What is 2+2?", agents, 30, TimeUnit.SECONDS
);

Score-Based Selection

When you have a specific quality metric in mind, gatherWithScore() lets you provide a scoring function to rank responses and pick the best one.

Optional<AgentResponse> best = ScatterGather.gatherWithScore(
    "Write a haiku about Java",
    agents,
    response -> (double) response.length(),  // score by length
    30, TimeUnit.SECONDS
);

Aggregate (Synthesize)

When you want a single combined answer instead of picking one response, gatherAndAggregate() collects all responses and passes them to a dedicated aggregator agent that synthesizes them into a unified answer.

Optional<String> synthesized = ScatterGather.gatherAndAggregate(
    "What are the risks of this deployment?",
    List.of(securityAgent, infraAgent, qaAgent),
    summarizerAgent,
    60, TimeUnit.SECONDS
);

StreamingChannel

A StreamingChannel lets agents send a continuous flow of data to each other in real time, rather than waiting for complete responses. This is useful for long-running tasks where you want to process results incrementally. It is a typed streaming channel with configurable backpressure that supports reactive-style map, filter, and subscribe operations.

StreamingChannel<String> channel = StreamingChannel.<String>builder()
    .id("analysis-stream")
    .bufferSize(100)
    .backpressure(BackpressureStrategy.DROP_OLDEST)
    .offerTimeout(Duration.ofSeconds(5))
    .build();

// Consumer side
channel.subscribe(chunk -> processChunk(chunk));
channel.onError(err -> log.error("Stream error", err));
channel.onComplete(() -> log.info("Stream finished"));

// Producer side
channel.emit("data chunk 1");
channel.emit("data chunk 2");
channel.emitAll(List.of("chunk 3", "chunk 4", "chunk 5"));
channel.complete();

BackpressureStrategy

When a producer emits data faster than the consumer can handle, the backpressure strategy determines what happens. Choose based on whether you prefer to wait, lose old data, lose new data, or fail loudly.

StrategyBehavior
BLOCKBlock producer until buffer has space (with configurable timeout)
DROP_OLDESTRemove oldest item from buffer to make room
DROP_NEWESTReject incoming item silently
ERRORThrow BufferOverflowException when full

Transformations

You can chain filter() and map() operations on a channel to create derived channels, similar to Java Streams or reactive pipelines.

StreamingChannel<Integer> lengths = channel
    .filter(s -> !s.isBlank())
    .map(String::length);

Metrics

Each channel tracks how many items were emitted, dropped, and buffered, so you can monitor throughput and detect backpressure issues.

StreamingChannel.ChannelMetrics metrics = channel.metrics();
// channelId, emitted, dropped, bufferUsed, bufferCapacity, subscriberCount, completed
double dropRate = metrics.dropRate(); // percentage of dropped items

The channel implements Closeable. Call close() to shut down the dispatcher thread and clear the buffer.

HandoffChain

A HandoffChain passes a task through a sequence of agents, where each agent's output becomes the next agent's input. This is similar to a pipeline but uses the handoff mechanism, which allows agents to pass structured context (metadata, priority, previous output) along the chain. Supports conditional steps that can be skipped based on the context.

HandoffChain chain = HandoffChain.builder()
    .add(triageAgent)
    .add(researchAgent)
    .add(qualityAgent, ctx -> ctx.has("previousOutput"))  // conditional
    .add(responseAgent)
    .build();

HandoffContext initialContext = new HandoffContext(
    "task-123",
    Map.of("query", "How to reset password?"),
    Map.of("priority", "high"),
    null
);

List<HandoffResult> results = chain.execute(handoff, initialContext);

The first agent in the chain is the source. Conditional steps whose predicate returns false are skipped. Execution stops on the first failure.

LazyAgentPool

When you have many concurrent tasks but don't want to create a new agent for each one, LazyAgentPool manages a reusable pool of agents. Agents are created on demand, returned to the pool after use, and automatically destroyed when idle too long. This is an object pool with lazy creation, idle eviction, and configurable activation policies.

LazyAgentPool pool = LazyAgentPool.builder()
    .factory(() -> new WorkerAgent(llmClient))
    .maxSize(20)
    .minIdle(2)
    .idleTimeout(5, TimeUnit.MINUTES)
    .policy(ActivationPolicy.PREWARMED)
    .listener(new PoolListener() { ... })
    .build();

// Checkout/checkin via try-with-resources
try (AgentHandle handle = pool.checkout()) {
    String result = handle.execute("Process this document");
} // agent is returned to pool automatically

ActivationPolicy

The activation policy controls when agents are created. Pre-warming creates agents upfront so they are ready immediately, while on-demand waits until they are needed.

PolicyBehavior
ON_DEMANDAgents created only when checked out
PREWARMEDPre-creates minIdle agents at pool construction

PoolMetrics

Monitor pool utilization to right-size your pool configuration and detect resource leaks.

PoolMetrics metrics = pool.getMetrics();
// active, idle, totalCreated, totalDestroyed, peakActive

The pool schedules periodic eviction to destroy idle agents exceeding minIdle. Call preWarm(count) to proactively create agents before a traffic spike. The pool implements Closeable and destroys all agents on close().

SupervisorAgent with Guardrails

A SupervisorAgent adds a safety layer around a group of agents. Before any agent executes an action, the supervisor checks it against a list of guardrails (policy rules). This lets you enforce constraints like "require human approval for deployments" or "block actions that exceed a budget" without modifying individual agents.

SupervisorAgent supervisor = new SupervisorAgent(group, List.of(
    new HITLApproval(Set.of("deploy", "delete"), approvalCallback),
    myBudgetGuardrail,
    myContentFilterGuardrail
));

Guardrail.GuardrailResult result = supervisor.checkAction(
    "agent-1", "deploy", Map.of("target", "production")
);

if (result.allowed()) {
    // proceed
} else {
    System.out.println("Blocked: " + result.reason());
    // result.severity() -> BLOCK or WARNING
}

// Dynamic guardrail management
supervisor.addGuardrail(new RateLimitGuardrail());
supervisor.removeGuardrail("old-guardrail-name");

Guardrail Interface

Each guardrail implements a simple interface: given an agent ID, action name, and context, it returns a result saying whether the action is allowed, blocked, or a warning.

public interface Guardrail {
    GuardrailResult check(String agentId, String action, Map<String, Object> context);
    String getName();
}

Guardrails are evaluated in order. If any guardrail blocks, evaluation stops immediately. WARNING severity allows execution but logs a warning.

Built-in: HITLApproval

The Human-in-the-Loop (HITL) guardrail pauses execution and requests human approval for sensitive actions. Takes a set of action names that require approval and a callback for requesting it.

Built-in: SandboxedExecution

The SandboxedExecution guardrail runs agent actions in an isolated context for safety testing, ensuring they cannot affect production state.

TrustRegistry

The TrustRegistry tracks how reliably each agent performs each capability. As agents complete (or fail) tasks, their trust scores update automatically. You can then use these scores to route tasks to the most trusted agent or filter out unreliable ones.

TrustRegistry registry = TrustRegistry.create(
    TrustUpdateStrategy.ema(0.3),  // Exponential Moving Average with alpha=0.3
    0.5                             // initial trust score for new agents
);

// Record outcomes
registry.recordOutcome("agent-a", "translation", Outcome.SUCCESS, 0.95);
registry.recordOutcome("agent-a", "translation", Outcome.PARTIAL, 0.60);
registry.recordOutcome("agent-b", "translation", Outcome.FAILURE, 0.0);

// Query trust
double trust = registry.getTrust("agent-a", "translation"); // 0.0 - 1.0

// Rank agents for a capability
List<String> best = registry.rankAgents("translation", 3);

// Get agents above threshold
List<String> qualified = registry.getAgentsAboveThreshold("translation", 0.7);

// Periodic decay (call e.g. daily)
registry.applyDecay();

TrustUpdateStrategy

The update strategy determines how new outcomes affect an agent's trust score. EMA (exponential moving average) gives more weight to recent performance, while simple average treats all outcomes equally. You can also provide custom implementations. The TrustBasedRouter uses the registry to route tasks to the most trusted agent for a given capability.

RecursiveTaskSolver (ROMA)

The RecursiveTaskSolver handles complex tasks that are too big for a single agent. It breaks a task into smaller sub-tasks, solves each one, combines the results, and verifies quality. If the sub-tasks are still too big, it recurses until reaching atomic (simple enough to solve directly) tasks. This implements the Recursive Orchestration of Multi-Agent (ROMA) pattern.

RecursiveTaskSolver solver = RecursiveTaskSolver.builder()
    .solver(solverAgent)
    .decomposer(TaskDecomposer.llmBased(plannerAgent))
    .atomizer(Atomizer.depthBased(3))
    .verifier(Verifier.llmBased(reviewerAgent))
    .maxDepth(5)
    .maxRetries(2)
    .build();

RecursiveTaskSolver.SolveResult result = solver.solve(
    "Build a REST API for user management"
);

System.out.println("Success: " + result.success());
System.out.println("Output: " + result.output());
System.out.println("Traces: " + result.traces().size());
System.out.println("Duration: " + result.durationMs() + "ms");

Algorithm

Here is how the recursive solving process works step by step:

  1. Check if task is atomic (via Atomizer).
  2. If atomic: solve directly with solver agent, then verify.
  3. If not: decompose into sub-tasks (via TaskDecomposer).
  4. Recursively solve each sub-task.
  5. Aggregate sub-results using the solver agent.
  6. Verify aggregated result. Retry up to maxRetries on rejection.

Components

The solver is composed of three pluggable components. You can use the built-in implementations or provide your own.

ComponentInterfaceBuilt-in
AtomizerDecides if a task is atomicAtomizer.depthBased(maxDepth)
TaskDecomposerBreaks task into sub-tasksTaskDecomposer.llmBased(agent)
VerifierAccepts or rejects resultsVerifier.llmBased(agent), Verifier.alwaysAccept()

SolveTrace

For debugging and auditability, each step in the recursive solving process produces a trace entry. Each step produces a trace entry with type DECOMPOSED, SOLVED, AGGREGATED, or VERIFIED, the task, detail, depth, and attempt number.

ChoreographyEngine

The ChoreographyEngine implements event-driven coordination without a central orchestrator. Instead of one agent telling others what to do, you define a set of event-reaction rules ("when event X happens, agent Y does Z and emits event W"), and the engine runs them automatically. This is ideal for microservice-style workflows where each agent handles its own piece of the process.

ChoreographySpec spec = ChoreographySpec.builder()
    .name("order-processing")
    .initialEvent("order.received")
    .interaction("validate", "validator", "order.received",
        "Validate the order", "order.validated")
    .interaction("bill", "billing", "order.validated",
        "Process payment", "payment.processed")
    .interaction("fulfill", "warehouse", "payment.processed",
        "Ship the order", "order.shipped")
    .build();

ChoreographyEngine engine = ChoreographyEngine.builder()
    .spec(spec)
    .agent("validator", validatorAgent)
    .agent("billing", billingAgent)
    .agent("warehouse", warehouseAgent)
    .timeout(Duration.ofMinutes(2))
    .build();

ChoreographyEngine.ChoreographyResult result = engine.execute();
System.out.println("Success: " + result.success());
System.out.println("Completed: " + result.completedCount());
System.out.println("Failed: " + result.failedCount());

Safety Features

The engine includes built-in safety checks to catch common pitfalls in event-driven systems before they cause problems at runtime.

  • DeadlockDetector: Analyzes the spec for circular event dependencies before execution.
  • LivelockDetector: Detects repetitive event patterns during execution.

Async Execution

Like other executors, the choreography engine supports non-blocking execution via CompletableFuture.

CompletableFuture<ChoreographyResult> future = engine.executeAsync();

Simulation Framework

The simulation framework lets you test multi-agent behaviors in a controlled environment before deploying them. You define a world state and rules, then run agents through turns while a validator enforces the rules. TnsAI.Coordination includes this framework for modeling multi-agent interactions in controlled environments. See the examples/simulation/ directory in the Coordination repository.

SimulationEnvironment

The environment defines the initial world state (who the actors are, what resources exist, what relationships are in place) and the simulation constraints like maximum turns.

SimulationEnvironment env = SimulationEnvironment.builder()
    .name("diplomacy-sim")
    .initialState(Map.of(
        "countries", List.of("A", "B", "C"),
        "resources", Map.of("A", 100, "B", 80, "C", 120),
        "alliances", List.of()
    ))
    .maxTurns(50)
    .build();

SimulationLoop

The SimulationLoop is the main driver that runs the simulation turn by turn. Each turn, it asks agents for their actions, validates them, and applies them to the environment state.

SimulationLoop loop = SimulationLoop.builder()
    .environment(env)
    .agents(List.of(agentA, agentB, agentC))
    .validator(new SecretaryValidator())
    .build();

SimulationResult result = loop.run();
result.getTurnCount();       // Number of turns executed
result.getFinalState();      // Final environment state
result.getHistory();         // Turn-by-turn action history

SecretaryValidator

The SecretaryValidator acts as a referee, checking every agent action against the simulation rules before it is applied. It prevents illegal moves, enforces turn order, and checks resource constraints.

public class SecretaryValidator implements ActionValidator {
    @Override
    public ValidationResult validate(AgentAction action, Map<String, Object> state) {
        if (action.getType().equals("trade") && !hasResources(action, state)) {
            return ValidationResult.reject("Insufficient resources for trade");
        }
        return ValidationResult.accept();
    }
}

DiplomacySimulation Example

To see the simulation framework in action, the examples/simulation/ directory includes a complete diplomacy simulation with country agents that negotiate treaties, trade resources, and form alliances. The SecretaryValidator enforces diplomacy rules while the SimulationLoop manages negotiation rounds.

DiplomacySimulation sim = new DiplomacySimulation();
sim.addCountry("Alphaland", agentAlpha);
sim.addCountry("Betaland", agentBeta);
sim.addCountry("Gammaland", agentGamma);

SimulationResult result = sim.run(30); // 30 rounds
System.out.println("Final alliances: " + result.getFinalState().get("alliances"));

DebateExecutor

The DebateExecutor improves answer quality by having two LLM agents argue opposing positions on a question, then having a judge evaluate both sides and deliver a verdict. This adversarial approach forces each side to identify weaknesses in the other's reasoning, leading to more thorough analysis than a single LLM would produce.

DebateExecutor debate = DebateExecutor.builder()
    .proponent(proponentLLM)
    .opponent(opponentLLM)
    .judge(judgeLLM)
    .maxRounds(3)
    .format(DebateFormat.STRUCTURED)
    .timeout(Duration.ofMinutes(10))
    .build();

DebateResult result = debate.execute("Should we use microservices for this project?");

System.out.println("Verdict: " + result.verdict());
System.out.println("Confidence: " + result.confidence());  // 0.0 - 1.0
System.out.println("Rounds: " + result.roundCount());
System.out.println("Pro summary: " + result.proSummary());
System.out.println("Con summary: " + result.conSummary());

// Access individual rounds
for (DebateRound round : result.rounds()) {
    System.out.println("Round " + round.roundNumber());
    System.out.println("  Pro: " + round.proponentArgument());
    System.out.println("  Con: " + round.opponentArgument());
}

DebateFormat

The debate format controls the structure of the argumentation.

FormatDescription
STRUCTUREDFormal pro/con structure with sections
FREE_FORMOpen-ended discussion

The judge produces a verdict with structured sections (VERDICT, REASONING, PRO SUMMARY, CON SUMMARY, CONFIDENCE). Confidence is extracted as a percentage from the judge's output.

On this page