Documentation

Agent Data Flow

How agents pass files, results, and metadata between each other

Architecture Overview

Every agent declares what data it consumes and produces via an AgentContract. Both server-side execution paths (the chat loop and the Pipeline API) use these contracts to populate each agent's inputs from a shared per-request state and route its outputs back into the same state. Agents pass lightweight file references (IDs) and resolve actual content on demand via a FileResolver.

The Pipeline builder gives you a fluent API to declare a multi-step run; the server schedules steps into topological waves and threads data between them through that shared state. This page covers the underlying SDK primitives (AgentContract, ExecutionContext, FileCollection, FileResolver) for building custom client-side orchestration around the same data shapes.

Building Your Own Agent

The Agent Protocol

To create an agent, implement two methods: get_contract() and execute(). No inheritance required — the SDK uses structural typing via a Protocol.

python
from scopix.types.contracts import (
AgentCapability, AgentContract, TypedInput, TypedOutput,
)
from scopix.types.agent import AgentResult
from scopix.types.context import ExecutionContext
from scopix.types.file_collection import FileCollection
class MySearchAgent:
def get_contract(self) -> AgentContract:
return AgentContract(
name="MySearchAgent",
capability=AgentCapability.SEARCH,
description="Searches images by description",
inputs=(
TypedInput(name="query", data_type="TEXT", description="Search query"),
),
outputs=(
TypedOutput(name="results", data_type="FILE_IDS", mergeable=True),
),
can_chain_with=(AgentCapability.ANALYSIS, AgentCapability.ORGANIZATION),
)
async def execute(self, context: ExecutionContext) -> AgentResult:
query = context.read("query")
# ... perform search logic ...
found_ids = ["img-1", "img-2", "img-3"]
results = FileCollection.from_ids(found_ids, content_type="images").to_dict()
# Write outputs to context for downstream agents
context.write("results", results, "FILE_IDS", "MySearchAgent")
return AgentResult(
success=True,
agent_name="MySearchAgent",
capability="search",
summary=f"Found {len(found_ids)} images for '{query}'",
)

Chaining Agents with ExecutionContext

The ExecutionContext is the shared state container. Agent A writes results to a slot, Agent B reads from that slot. Each slot is typed and tracks provenance.

python
from scopix.types.context import ExecutionContext
# Create shared context
ctx = ExecutionContext()
# Agent A writes search results
ctx.write("search_results", {"ids": ["img-1", "img-2"]}, "FILE_IDS", "search_agent")
# Agent B reads them and writes categorization
file_data = ctx.read("search_results", expected_type="FILE_IDS")
ctx.write("categorization", {
"categories": [{"name": "Poles", "file_ids": ["img-1", "img-2"]}],
"total_files": 2,
}, "CATEGORIZATION", "categorize_agent")
# Agent C reads both slots
cat = ctx.read("categorization", expected_type="CATEGORIZATION")
files = ctx.read("search_results")
# Inspect context state
ctx.list_slots()
# [{"name": "search_results", "type": "FILE_IDS", "source": "search_agent"},
# {"name": "categorization", "type": "CATEGORIZATION", "source": "categorize_agent"}]
# Serialize/restore for checkpointing
checkpoint = ctx.to_dict()
restored = ExecutionContext.from_dict(checkpoint)

Data Type Registry

The DataTypeRegistry tracks which data types exist and how to merge them. Use get_default_registry() for a pre-loaded registry, or build your own.

python
from scopix.types.data_types import (
DataTypeDefinition, DataTypeRegistry, get_default_registry,
)
# Pre-loaded with built-in types
registry = get_default_registry()
registry.list_types()
# ["FILE_IDS", "CATEGORIZATION", "FOLDER_RESULT", "ANALYSIS_RESULT", "TEXT", "CROSS_REF"]
# FILE_IDS and CATEGORIZATION support merging
registry.can_merge("FILE_IDS") # True — uses FileCollection.merge()
registry.can_merge("TEXT") # False
# Register your own types
registry.register(
DataTypeDefinition(name="EMBEDDINGS", description="Vector embeddings"),
merge_fn=lambda *arrs: [v for arr in arrs for v in arr], # Concatenate
)

Implementing a FileResolver

Implement the FileResolver Protocol to connect your own storage backend. Agents use it to resolve FileRef objects to actual content bytes.

python
from scopix.types.file_collection import FileRef
from scopix.types.file_resolver import FileResolver, FileResolutionError
import aiofiles
class LocalFileResolver:
def __init__(self, base_path: str):
self.base_path = base_path
async def resolve(self, ref, hints=None):
path = f"{self.base_path}/{ref.id}"
try:
async with aiofiles.open(path, "rb") as f:
return await f.read()
except FileNotFoundError:
raise FileResolutionError(f"File not found: {ref.id}")
async def resolve_batch(self, refs, batch_size=100):
for ref in refs:
yield ref, await self.resolve(ref)
async def resolve_metadata(self, ref):
return ref # Could enrich with file stats
async def store(self, content, media_type, metadata=None):
import uuid
file_id = str(uuid.uuid4())
path = f"{self.base_path}/{file_id}"
async with aiofiles.open(path, "wb") as f:
await f.write(content)
return FileRef(id=file_id, media_type=media_type)

How It Works (Backend)

Backend Reference

This section describes how Scopix's backend orchestrates agents in the chat path (and analogously in the Pipeline API). The Pipeline builder lets you drive this same per-call agent execution path from a single HTTP request — you don't need to reimplement any of this logic. The primitives below (contracts, context, registry) are available if you want to build custom client-side orchestration around the same data shapes.

The Data Flow Cycle

The orchestrator uses an agentic ReAct loop (Reason → Act → Observe). Each chat message follows this cycle:

  1. Reason — The orchestrator LLM decides which tool (agent) to invoke next based on the user's query and previous observations
  2. Contract-driven input building — The agent's contract declares what data it needs via TypedInput. The system reads matching fields from ConversationState based on (data_type, content_type_hint) and wraps them as FileCollection-shaped inputs
  3. Execute — The agent runs with a fresh database session (isolation per tool call) and a per-agent timeout
  4. Contract-driven output routing — A ContextRouter reads the agent's contract TypedOutput declarations and routes results to the correct ConversationState fields (e.g. FILE_IDS with hint "images"current_image_ids)
  5. Observe — The LLM receives a concise summary of the result (not the raw data) and decides whether to invoke another agent or respond
  6. Context persistence — Updated conversation state carries forward to future turns

Contract-Driven Routing

Adding a new agent requires only defining its AgentContract. The input building and output routing are driven entirely by the contract's TypedInput/TypedOutput declarations and their content_type_hint fields — no hardcoded agent-name wiring needed.

Example Pipeline

A user message like "Find sunset images and organize them into folders" produces this sequence:

text
Iteration 1: LLM selects → search_images("sunset images")
┌─ Input: contract has no required FILE_IDS inputs
├─ Execute: ImageSearchAgent runs, returns 47 image IDs
└─ Route: contract output (FILE_IDS, hint="images")
→ ContextRouter writes conversation.current_image_ids = [47 IDs]
→ content_type = "images"
Iteration 2: LLM observes "Found 47 images" → selects analyze_images("categorize")
┌─ Input: contract input (FILE_IDS, hint="images")
│ → reads conversation.current_image_ids → wraps as FileCollection
├─ Execute: AnalysisAgent runs, returns 5 categories
└─ Route: contract output (CATEGORIZATION)
→ ContextRouter writes conversation.current_categorization
Iteration 3: LLM observes "5 categories" → selects manage_folders("organize")
┌─ Input: contract input (CATEGORIZATION)
│ → reads conversation.current_categorization
├─ Execute: FolderAgent creates folders, moves files
└─ Route: contract output (FOLDER_RESULT) → no ConversationState mapping
LLM responds: "Found 47 sunset images and organized into 5 folders"

Agent Contracts

Declaring Inputs and Outputs

Every agent declares a contract specifying the data types it consumes and produces. On the server, these contracts drive the content-type-hint-based input population and output routing that lets chained agents communicate without hard-coded wiring.

python
from scopix.types.contracts import AgentContract, AgentCapability, TypedInput, TypedOutput
# A search agent that PRODUCES file references
contract = AgentContract(
name="ImageSearchAgent",
capability=AgentCapability.SEARCH,
description="Searches images by natural language",
inputs=(
TypedInput(
name="folder_scope",
data_type="FILE_IDS",
required=False,
description="Optional folder to restrict search scope",
),
),
outputs=(
TypedOutput(
name="results",
data_type="FILE_IDS", # Produces a FileCollection
mergeable=True, # Can be merged with other FILE_IDS
content_type_hint="images", # Contains image references
),
),
)
python
# An analysis agent that CONSUMES file references and PRODUCES categorization
contract = AgentContract(
name="AnalysisAgent",
capability=AgentCapability.ANALYSIS,
description="Categorizes files into semantic groups",
inputs=(
TypedInput(
name="file_ids",
data_type="FILE_IDS", # Expects a FileCollection
required=False,
content_type_hint="images", # Specifically image files
),
),
outputs=(
TypedOutput(name="categorization", data_type="CATEGORIZATION", mergeable=True),
TypedOutput(name="analysis", data_type="ANALYSIS_RESULT", mergeable=False),
),
)

Content Type Hints

The content_type_hint on contract inputs and outputs tells the graph what kind of files an agent works with. This enables content-aware routing — image search results are routed to image-consuming agents, document results to document agents.

  • "images" — Image file references (JPG, PNG, etc.)
  • "documents" — Document file references (PDF, DOCX, etc.)
  • "links" — Saved web page/link references
  • "mixed" — Collection containing multiple content types

FileCollection in Practice

Producing Results

Search agents create FileCollection objects and write them as typed output to the execution context. The collection carries the content type and source provenance.

python
from scopix.types.file_collection import FileCollection
from scopix.types.agent import AgentResult
# Create a collection from search results
collection = FileCollection.from_ids(
ids=image_ids, # ["img_1", "img_2", ..., "img_47"]
content_type="images", # Declares what's in the collection
source_capability="SEARCH", # Tracks provenance
)
# Write to context so downstream agents can read it
context.write("results", collection.to_dict(), "FILE_IDS", "search_agent")
return AgentResult(
success=True,
agent_name="SearchAgent",
capability="search",
summary=f"Found {collection.count} images",
)

Consuming Results

Downstream agents receive file IDs through the execution context. They read from typed slots and can use a FileResolver to load actual content on demand.

python
from scopix.types.agent import AgentResult
from scopix.types.file_collection import FileRef
from scopix.types.file_resolver import FileResolutionError
class AnalysisAgent:
def __init__(self, resolver):
self.resolver = resolver # Inject FileResolver via constructor
async def execute(self, context):
# Read upstream results from context
file_data = context.read("results", expected_type="FILE_IDS")
image_ids = file_data.get("ids", [])
if not image_ids:
return AgentResult(
success=False, agent_name="AnalysisAgent",
capability="analysis", error="No images to analyze",
)
# Use FileResolver to load actual content when needed
for image_id in image_ids[:10]:
ref = FileRef(id=image_id, media_type="image/jpeg")
try:
image_bytes = await self.resolver.resolve(ref)
# ... analyze image_bytes ...
except FileResolutionError:
continue # Skip unresolvable refs

Combining FileCollections (Client-Side Fan-In)

Merging file references in your own code

On the server, the Pipeline API rejects two parallel parents that produce the same content type into one downstream step (the shared per-pipeline state has only one slot per content type, so a merge would last-write-wins). When you want that pattern, run the searches in separate pipelines and combine the results client-side using FileCollection.merge() or the DataTypeRegistry:

python
from scopix.types.data_types import get_default_registry
from scopix.types.file_collection import FileCollection
registry = get_default_registry()
# Merge two FileCollections using the registry
c1 = FileCollection.from_ids(["img-1", "img-2"], content_type="images")
c2 = FileCollection.from_ids(["img-3", "img-4"], content_type="images")
merged = registry.merge("FILE_IDS", c1, c2)
# merged.ids → ["img-1", "img-2", "img-3", "img-4"]
# merged.content_type → "images"
# Or directly:
merged = FileCollection.merge(c1, c2)

You can then feed the merged collection into a follow-up pipeline as seed data via .with_images(merged.ids).

Lazy Content Loading

IDs Travel, Data Stays

Agents pass lightweight FileCollection objects containing only IDs and metadata — not the actual file content. When an agent needs the real bytes (e.g. to analyze an image), it uses the FileResolver to load content on demand. This avoids bulk-loading hundreds of images through the graph.

python
# What flows through the graph: lightweight ID references
# SearchAgent output → AnalysisAgent input
{
"ids": ["img_1", "img_2", ..., "img_47"], # Just UUIDs
"content_type": "images",
"source_capability": "SEARCH"
}
# Total size: ~2 KB (just IDs)
# When AnalysisAgent needs actual image data:
ref = FileRef(id="img_1")
image_bytes = await resolver.resolve(ref)
# → loads from storage on demand: 500 KB per image
# Or enrich metadata without loading content:
enriched = await resolver.resolve_metadata(ref)
# enriched.filename == "sunset_beach.jpg"

Design Principles

Key Patterns

IDs travel, content stays

Agents pass lightweight FileCollection objects (just IDs). Content is loaded on-demand via FileResolver only when an agent actually needs the bytes.

Contracts drive wiring

Agents declare typed inputs/outputs via AgentContract. The server uses these contracts to populate each agent's typed inputs from the shared per-request state and route its outputs back into the same state — no hand-wired edges, no manual ID passing.

Content-aware routing

FILE_IDS slots carry a content_type ("images", "documents", "links"). Agents can filter or route based on what kind of files they receive.

Immutable data types

Most SDK data types (FileRef, AgentContract, Category, etc.) are frozen dataclasses. FileCollection enforces immutability via __slots__. Operations return new instances, preventing accidental mutation.

Structural typing

The Agent and FileResolver protocols use duck typing — no inheritance required. Any class with the right methods satisfies the protocol.