Channels
The Channels module (TnsAI.Channels) is a multi-channel messaging gateway that connects TnsAI agents to external messaging platforms. It uses an adapter pattern with SPI discovery so new platforms can be added without modifying the core routing logic.
Built-in adapters: Telegram, CLI (REPL + JSON), Email (IMAP poll + SMTP send), Slack (Socket Mode), Discord (Gateway), WhatsApp (Cloud API). The Writing a New Channel Adapter section shows how to plug in your own platform via the SPI. The CLI adapter is streaming-capable — see StreamingChannelAdapter below.
Architecture
Messaging Platform --> ChannelAdapter --> UnifiedMessage --> MessageRouter --> Agent Layer
|
Messaging Platform <-- ChannelAdapter <-- UnifiedResponse <---------------------+The key design principles:
- Platform-agnostic message model: All platforms convert to/from
UnifiedMessageandUnifiedResponserecords - SPI discovery: Adapters are loaded via
ServiceLoaderfromMETA-INF/services - Optional dependencies: Platform SDKs (Telegram Bot API, etc.) are optional Maven dependencies
- Capability-aware routing: Each adapter declares what it supports via
ChannelCapabilities
Quick Start
// 1. Create registry and discover adapters on classpath
ChannelRegistry registry = new ChannelRegistry();
registry.discoverAdapters();
// 2. Create message router
MessageRouter router = new MessageRouter(registry);
router.setMessageHandler((message, callback) -> {
// Process with your agent
String response = agent.chat(message.text());
callback.send(UnifiedResponse.builder()
.channelId(message.channelId())
.conversationId(message.conversationId())
.text(response)
.build());
});
// 3. Start all adapters
registry.startAll(router);Package Layout
com.tnsai.channels
+-- api/ Core interfaces and message models
+-- annotation/ @Channel annotation
+-- enums/ ChannelState lifecycle enum
+-- model/ Attachment, MessageMeta, ResponseHints, ChannelError
+-- registry/ SPI-based adapter discovery and lifecycle
+-- routing/ MessageRouter, SessionRegistry
+-- telegram/ Telegram Bot API adapter
+-- cli/ Local REPL + JSON adapter (streaming-capable reference)
+-- email/ IMAP poll + SMTP send adapter
+-- slack/ Slack Socket Mode adapter
+-- discord/ Discord Gateway adapter
+-- whatsapp/ WhatsApp Cloud API adapterCore Interfaces
ChannelAdapter
The main SPI contract every messaging platform must implement:
public interface ChannelAdapter {
String id(); // Unique identifier (e.g. "telegram")
String displayName(); // Human-readable name
ChannelCapabilities capabilities(); // What the channel supports
void start(ChannelContext context); // Begin listening for messages
void stop(); // Tear down and release resources
void send(UnifiedResponse response); // Send a response back through the channel
boolean isRunning(); // Whether the adapter is operational
}Lifecycle: ChannelRegistry calls start(context) once with a ChannelContext for delivering inbound events. Adapters call context.onMessage(unified) when a message arrives from the platform.
ChannelAdapterFactory
SPI factory for lazy adapter creation with classpath availability checks:
public interface ChannelAdapterFactory {
String getType(); // Channel type identifier
String getDisplayName(); // Human-readable name
ChannelAdapter create(Object config); // Create adapter with platform-specific config
boolean isAvailable(); // Whether required SDK classes are on classpath
}Factories are registered in META-INF/services/com.tnsai.channels.api.ChannelAdapterFactory and support configuration-driven, lazy instantiation.
ChannelContext
Callback interface provided to adapters for delivering inbound events to the gateway:
public interface ChannelContext {
void onMessage(UnifiedMessage message); // Inbound message arrived
void onTyping(String conversationId); // Remote user is typing
void onError(ChannelError error); // Adapter encountered an error
}ChannelDiscovery
Utility for discovering available adapter factories via ServiceLoader:
// Only factories whose SDK is on the classpath
List<ChannelAdapterFactory> factories = ChannelDiscovery.discoverFactories();
// All registered factories, regardless of availability
List<ChannelAdapterFactory> all = ChannelDiscovery.discoverAllFactories();StreamingChannelAdapter (mixin)
since 0.10.2 — Mixin interface for adapters that want per-token delivery as the agent's reply streams, rather than waiting for the assembled UnifiedResponse:
public interface StreamingChannelAdapter extends ChannelAdapter {
void sendChunk(UnifiedChunk chunk);
}Because it extends ChannelAdapter, every StreamingChannelAdapter is also a regular adapter — gateway code typically does an instanceof check and dispatches chunks through sendChunk(...) as they arrive, then still calls send(UnifiedResponse) once with the assembled reply for non-streaming downstreams (logging, audit). Adapters that don't implement the mixin keep working unchanged — send(UnifiedResponse) remains the single delivery point.
The reference implementation is CliChannel (TNS-440 / #335):
- REPL mode prints the
assistant:prefix once on the first chunk, then concatenates deltas inline. - JSON mode emits one
{"type":"chunk","content":"...","done":...}record per chunk. - The post-stream
send(UnifiedResponse)is suppressed (the body was already rendered chunk-by-chunk); asuppressNextSendlatch resets after one consumption so subsequent standalonesend()calls render normally.
For the Sona-side gateway plumbing that fans framework Agent.streamChatWithTools chunks through this mixin, see Sona → How It Works.
UnifiedChunk
since 0.10.2 — The delta record StreamingChannelAdapter.sendChunk carries:
public record UnifiedChunk(
String conversationId, // Same key as the parent UnifiedResponse
String delta, // Text fragment (may be empty for control chunks)
boolean done, // true for the final chunk in a reply
Map<String, Object> metadata, // Free-form per-chunk metadata; null treated as empty
Instant timestamp // When the chunk was emitted
) { ... }Compact constructor null-normalises delta to "" and metadata to Map.of(), defensively copies metadata, and requires conversationId + timestamp. Two factory methods cover the common cases:
UnifiedChunk.text(conversationId, "hello"); // non-terminal text chunk
UnifiedChunk.done(conversationId); // final empty chunk with done=truedone is the streaming sentinel, not the lifecycle terminator — the matching UnifiedResponse still arrives via send(...) afterward unless the adapter explicitly suppresses it (as CliChannel does).
Message Models
UnifiedMessage (Inbound)
Platform-agnostic inbound message. Every adapter converts its native format into this record before handing it to the router.
public record UnifiedMessage(
String channelId, // Adapter id (e.g. "telegram")
String senderId, // Platform-specific user id
String senderName, // Human-readable sender name
String conversationId, // Platform-specific chat/channel id
String text, // Message text (null for media-only)
List<Attachment> attachments,
MessageMeta meta, // Reply-to, thread id, platform message id
Instant timestamp
) {
// Convenience method: unique session key
public String sessionKey(); // Returns "channelId:conversationId"
}Validation: channelId, senderId, and conversationId must be non-blank. Attachments are defensively copied.
Build with the fluent builder:
UnifiedMessage message = UnifiedMessage.builder()
.channelId("telegram")
.senderId("123456")
.senderName("Alice")
.conversationId("789")
.text("Hello, agent!")
.timestamp(Instant.now())
.build();UnifiedResponse (Outbound)
Platform-agnostic outbound response. The gateway produces this after agent processing; the adapter converts it into the native platform format.
public record UnifiedResponse(
String channelId, // Target adapter id
String conversationId, // Target chat/channel
String text, // Response text
List<Attachment> attachments, // Files to send
ResponseHints hints // Delivery hints
) {}Build with the fluent builder:
UnifiedResponse response = UnifiedResponse.builder()
.channelId("telegram")
.conversationId("789")
.text("Here is your answer.")
.hints(ResponseHints.withStreaming())
.build();Attachment
A file or media item attached to a message or response:
public record Attachment(
String fileName,
String mimeType,
String url, // Remote URL (null for local data)
byte[] data, // Raw bytes (null for remote)
long sizeBytes, // -1 if unknown
AttachmentType type // IMAGE, AUDIO, VIDEO, DOCUMENT, VOICE, STICKER, OTHER
) {
// Factory methods
static Attachment ofUrl(String fileName, String mimeType, String url, AttachmentType type);
static Attachment ofData(String fileName, String mimeType, byte[] data, AttachmentType type);
}MessageMeta
Optional platform-specific metadata:
public record MessageMeta(
String replyToMessageId, // Message this is replying to
String threadId, // Thread identifier
String platformMessageId, // Native message id from source platform
Map<String, String> extra // Arbitrary platform-specific data
) {
static MessageMeta of(String platformMessageId);
static MessageMeta reply(String platformMessageId, String replyToMessageId);
}ResponseHints
Controls how the adapter delivers a response:
public record ResponseHints(
boolean streaming, // Token-by-token delivery if supported
boolean showTyping, // Send typing indicator first
boolean splitLongMessages, // Auto-split text exceeding channel max length
String replyToMessageId // Native message id to reply to
) {
static ResponseHints defaults(); // typing=true, split=true
static ResponseHints withStreaming(); // streaming=true, typing=true, split=true
static ResponseHints replyTo(String messageId);
}ChannelError
Error record with severity classification:
public record ChannelError(
String channelId,
Severity severity, // TRANSIENT, DEGRADED, FATAL
String message,
Throwable cause,
Instant timestamp
) {
static ChannelError transient_(String channelId, String message, Throwable cause);
static ChannelError degraded(String channelId, String message, Throwable cause);
static ChannelError fatal(String channelId, String message, Throwable cause);
}ChannelCapabilities
Each adapter declares what it supports. The router and response formatter use this to adapt behavior per channel.
public record ChannelCapabilities(
boolean threads, // Threaded conversations
boolean reactions, // Emoji reactions
boolean fileUpload, // File/document uploads
boolean voice, // Voice messages
boolean typing, // Typing indicators
boolean editing, // Message editing (for streaming)
boolean streaming, // Token-by-token streaming via editing
long maxMessageLength // Max text length per message
) {}Build with the fluent builder:
ChannelCapabilities capabilities = ChannelCapabilities.builder()
.threads(false)
.reactions(true)
.fileUpload(true)
.voice(true)
.typing(true)
.editing(true)
.streaming(true)
.maxMessageLength(4096)
.build();ChannelState
Lifecycle state machine for adapters:
REGISTERED --> STARTING --> RUNNING --> STOPPING --> STOPPED
| |
v v
FAILED DEGRADED| State | Meaning |
|---|---|
REGISTERED | Registered but not yet started |
STARTING | Currently connecting to platform |
RUNNING | Accepting messages |
DEGRADED | Temporarily impaired (rate limited, reconnecting) |
STOPPING | Shutting down |
STOPPED | No longer accepting messages |
FAILED | Fatal error during start or operation |
Registry and Routing
ChannelRegistry
Discovers and manages adapter lifecycle. Adapters can be loaded via SPI or registered programmatically.
public class ChannelRegistry {
void discoverAdapters(); // Load via ServiceLoader
void register(ChannelAdapter adapter); // Register programmatically
void startAdapter(String channelId, ChannelContext context);
void startAll(ChannelContext context);
void stopAdapter(String channelId);
void stopAll();
Optional<ChannelAdapter> get(String channelId);
ChannelState getState(String channelId);
Collection<ChannelAdapter> getAllAdapters();
Collection<String> getRegisteredIds();
}Thread-safe: backed by ConcurrentHashMap. Duplicate registrations are logged and ignored.
MessageRouter
The central routing hub that bridges the channel layer with the agent layer. Implements ChannelContext so it receives all inbound events from adapters.
public class MessageRouter implements ChannelContext {
MessageRouter(ChannelRegistry registry);
// Connect to agent layer
void setMessageHandler(BiConsumer<UnifiedMessage, ResponseCallback> handler);
// Proactive outbound messaging
void sendProactive(String channelId, String conversationId, String text);
SessionRegistry getSessions();
// Callback interface for agent responses
@FunctionalInterface
interface ResponseCallback {
void send(UnifiedResponse response);
}
}The message handler receives the inbound message and a ResponseCallback for sending the agent's response back to the originating channel. This keeps TnsAI.Channels decoupled from specific agent implementations.
SessionRegistry
Tracks active conversations with idle reaping:
public class SessionRegistry {
void touch(String sessionKey, String channelId, String conversationId);
Optional<ChannelSession> get(String sessionKey);
Collection<ChannelSession> getAll();
void remove(String sessionKey);
int reapIdle(Duration maxIdle); // Remove sessions idle longer than threshold
int size();
record ChannelSession(
String sessionKey, // "channelId:conversationId"
String channelId,
String conversationId,
Instant createdAt,
Instant lastActive
) {}
}@Channel Annotation
Marks a class as a channel adapter for registry discovery:
@Channel(id = "telegram", displayName = "Telegram")
public class TelegramAdapter implements ChannelAdapter { ... }| Element | Type | Description |
|---|---|---|
id | String | Unique channel identifier (required) |
displayName | String | Human-readable name for logs and UI |
Reference Implementation: Telegram
The TelegramAdapter is the reference adapter implementation, showing how to build a complete channel integration.
Class: com.tnsai.channels.telegram.TelegramAdapter
Features:
- Long-polling via
com.pengrad.telegrambot.TelegramBot - Extracts text, photos, documents, voice, audio, video, stickers
- Sends typing indicators before responses
- Auto-splits long messages at newline/space boundaries (max 4096 chars)
- Reply-to support via
ReplyParameters - Token resolution from
TELEGRAM_BOT_TOKENenv var ortelegram.bot.tokensystem property
@Channel(id = "telegram", displayName = "Telegram")
public class TelegramAdapter implements ChannelAdapter {
@Override
public ChannelCapabilities capabilities() {
return ChannelCapabilities.builder()
.threads(false).reactions(true).fileUpload(true)
.voice(true).typing(true).editing(true)
.streaming(true).maxMessageLength(4096)
.build();
}
// ... start(), stop(), send(), isRunning()
}Factory (TelegramAdapterFactory): Reports isAvailable() == true only when com.pengrad.telegrambot.TelegramBot is on the classpath.
Writing a New Channel Adapter
To add a new messaging platform — e.g., a Microsoft Teams adapter or an SMS gateway:
- Create a package:
com.tnsai.channels.myplatform/ - Implement
ChannelAdapter, annotate with@Channel(id = "myplatform", displayName = "My Platform") - Implement
ChannelAdapterFactorywithisAvailable()checking for the platform SDK - Register in
META-INF/services/com.tnsai.channels.api.ChannelAdapterFactory - Add the platform SDK as an
<optional>dependency inpom.xml
Key requirements:
- Thread safety: adapters may receive concurrent messages
- Convert all inbound messages to
UnifiedMessagebefore callingcontext.onMessage() - Convert
UnifiedResponseback to the platform's native format insend() - Handle long messages by respecting
maxMessageLengthfrom capabilities - Report errors via
context.onError()with appropriateSeverity