Agents
Chiltepin integrates with Academy Agents to support distributed agent-based workflows. Agents enable long-running, stateful computations that can be launched on remote resources and interacted with asynchronously through an agent_action-based API.
Note
Chiltepin’s agent system builds on Academy Agents to provide:
Automatic workflow lifecycle management: Agents manage their own Parsl workflow context
Simplified agent creation: Use
@chiltepin_agentdecorator on regular Python classesRuntime configuration: Pass workflow config, executors, and paths via
manager.launch()Serializable behavior: Decorated classes remain fully serializable for remote execution
For more information about Academy Agents, see the Academy documentation.
Note
Decorator Order:
The order of @agent_action and the chiltepin task decorators (@python_task, @bash_task, @join_task)
does not affect behavior—both orders are supported and tested. For consistency and readability, we recommend
using the task decorators (@python_task, @bash_task, @join_task) outermost and @agent_action
innermost (closest to the function), but either order will work.
Important
ChiltepinManager and AgentSystem only support agents decorated with @chiltepin_agent. Native
Academy agents (not decorated) are not supported and will raise an error if launched with ChiltepinManager.
Use the base Academy Manager for native agents.
Warning
Multi-Agent Deployments on Shared Filesystems:
When launching multiple agents on systems with shared filesystems (e.g., HPC clusters,
shared network storage), you must provide unique agent_workflow_run_dir values
to each agent to avoid Parsl directory collisions. If agent_workflow_run_dir is not
specified, a unique directory is auto-generated using a UUID.
Recommended practice: Explicitly set agent_workflow_run_dir in manager.launch()
to use predictable paths for debugging:
agent1 = await manager.launch(
MyAgent,
agent_workflow_config=config,
agent_workflow_run_dir="/scratch/agent1_runinfo" # Unique path
)
agent2 = await manager.launch(
MyAgent,
agent_workflow_config=config,
agent_workflow_run_dir="/scratch/agent2_runinfo" # Different path
)
Overview
Chiltepin provides five main components for agent-based workflows:
@chiltepin_agent: Decorator to wrap a regular Python class as an agent
@agent_action: Decorator to mark methods that should be exposed as agent actions (works with sync or async)
@agent_loop: Decorator to mark async methods that should run as background loops (must be async)
AgentSystem: Helper class to simplify Academy Manager setup with Parsl executors
ChiltepinManager: Custom Manager that supports workflow configuration parameters
When to Use Agents
Use agents when you need:
Long-running services: Agents that persist beyond a single task execution
Stateful computations: Maintaining state across multiple agent_action invocations
Background processing: Loops that update state while handling requests
Autonomous behavior: Agents that can make decisions and act without external prompts
Remote interaction: Asynchronous communication with computations on remote resources
For one-off tasks without shared state, use Tasks instead.
Important
Internet Access Requirement:
Agents must be deployed to resources with internet access because they communicate with the Academy Exchange server. This is a critical infrastructure requirement:
Agents typically use a localhost provider (on the local machine or a remote endpoint)
Compute nodes without internet access (common on HPC systems) cannot host agents
Tasks launched by agents can run anywhere, but the agents themselves need connectivity
Correct configuration (agent on internet-connected resource):
config = {
"agent-executor": {
"provider": "localhost", # Runs on login node or local machine with internet
}
}
Incorrect configuration (agent on compute node without internet):
# ❌ This will fail - compute nodes often lack internet access
config = {
"agent-executor": {
"provider": "slurm", # Compute nodes typically cannot reach Exchange
}
}
To deploy agents to remote HPC systems, use a Globus Compute endpoint with localhost provider running on a login node or other internet-connected resource.
Behaviors vs Agents
Chiltepin uses a clean separation between behavior classes (domain logic) and agent classes (deployment wrappers). This design pattern provides flexibility, testability, and code reusability.
Recommended Pattern: Create Wrapper Classes
The recommended approach is to keep your behavior classes undecorated and create separate agent wrapper classes when you need to deploy them:
# 1. Define behavior class - pure domain logic, NO decorator
class WeatherModelBehavior:
"""Reusable behavior class for weather modeling."""
def __init__(self, location: str):
self.location = location
self.temperature = 20.0
@agent_action
async def get_temperature(self) -> float:
"""Get current temperature - works standalone or as agent."""
return self.temperature
@agent_action
async def set_temperature(self, temp: float) -> None:
"""Set temperature - works standalone or as agent."""
self.temperature = temp
@agent_loop
async def update_temperature(self, shutdown):
"""Background loop - runs automatically when deployed as agent."""
import asyncio, random
while not shutdown.is_set():
await asyncio.sleep(1)
self.temperature += random.uniform(-2, 2)
# 2. Create agent wrapper for deployment - just add decorator
@chiltepin_agent()
class WeatherModelAgent(WeatherModelBehavior):
"""Agent deployment wrapper for WeatherModelBehavior."""
pass # Inherits all behavior, decorator enables agent deployment
Benefits of This Pattern:
Testability: Test behavior logic without agent infrastructure
# Test behavior directly async def test_weather_behavior(): behavior = WeatherModelBehavior("Boulder, CO") await behavior.set_temperature(25.0) assert await behavior.get_temperature() == 25.0
Flexibility: Use behaviors with or without agents
async def example(): # Use as standalone behavior local_weather = WeatherModelBehavior("Local") temp = await local_weather.get_temperature() # Deploy as remote agent remote_weather = await manager.launch( WeatherModelAgent, args=("Remote",), ... )
Hierarchy Freedom: Build complex behavior hierarchies, then wrap any level
class BaseBehavior: @agent_action async def base_method(self): pass class SpecializedBehavior(BaseBehavior): @agent_action async def specialized_method(self): pass # Deploy the specialized behavior as an agent @chiltepin_agent() class SpecializedAgent(SpecializedBehavior): pass
Separation of Concerns: Behavior = “what it does”, Agent = “where/how it runs”
Important
Understanding @agent_loop Methods:
Methods decorated with @agent_loop are lifecycle hooks that run automatically
as background tasks when the behavior is deployed as an agent. They rely on the Agent
infrastructure to:
Provide the
shutdowneventSchedule them as background tasks
Manage their lifecycle (startup/shutdown)
This means:
✅
@agent_actionmethods work on standalone behaviors AND agents⚠️
@agent_loopmethods are ordinary async methods, but they are only started automatically and lifecycle-managed when deployed as agents🔍 If your behavior has loops, it’s generally designed for agent deployment
import asyncio
# Behavior with loop - designed for agent-managed execution
class MonitorBehavior:
@agent_loop
async def heartbeat(self, shutdown):
"""Runs automatically when managed by an agent runtime."""
while not shutdown.is_set():
await asyncio.sleep(1)
print("heartbeat")
# Standalone instance - the loop exists, but it is not started automatically
behavior = MonitorBehavior() # ⚠️ heartbeat will not auto-start here
# Deploy as agent - loops activate automatically
@chiltepin_agent()
class MonitorAgent(MonitorBehavior):
pass
agent = await manager.launch(MonitorAgent, ...) # ✅ heartbeat starts automatically
Alternative Pattern: Direct Decoration
You can also apply @chiltepin_agent directly to a class if you don’t need the
behavior/agent separation:
@chiltepin_agent()
class SimpleAgent:
"""Combined behavior and agent - less flexible but more concise."""
def __init__(self, value: int):
self.value = value
@agent_action
async def get_value(self) -> int:
return self.value
This is simpler for small, single-purpose agents but sacrifices testability and reusability. Use the wrapper pattern when building reusable components or complex behavior hierarchies.
Basic Usage
Creating an Agent
Recommended: Behavior + Agent Wrapper Pattern
Create an undecorated behavior class, then wrap it for deployment:
from chiltepin.agents import chiltepin_agent, agent_action, agent_loop
from chiltepin.tasks import python_task
# 1. Define behavior class (undecorated, reusable)
class WeatherModelBehavior:
"""Reusable weather model behavior."""
def __init__(self, temperature: float):
self.temperature = temperature
@python_task
@agent_action
def forecast(self) -> str:
"""Generate a forecast based on current temperature."""
import random
conditions = ["sunny", "cloudy", "rainy"]
return f"{random.choice(conditions)} at {self.temperature}°C"
@agent_action
async def get_temperature(self) -> float:
"""Get the current temperature."""
return self.temperature
@agent_loop
async def update_temperature(self, shutdown):
"""Background loop - runs automatically when deployed as agent."""
import asyncio
import random
while not shutdown.is_set():
await asyncio.sleep(1)
self.temperature += random.uniform(-2, 2)
# 2. Create agent wrapper (decorated, for deployment)
@chiltepin_agent(agent_workflow_include=["compute"])
class WeatherModelAgent(WeatherModelBehavior):
"""Agent deployment wrapper."""
pass # Inherits all behavior
Quick Pattern: Direct Decoration
For simple, single-purpose agents, you can decorate directly:
@chiltepin_agent(agent_workflow_include=["compute"])
class SimpleWeatherAgent:
"""Combined behavior and agent."""
def __init__(self, temperature: float):
self.temperature = temperature
@agent_action
async def get_temperature(self) -> float:
return self.temperature
Key Features
Regular Python classes: No special inheritance, fully serializable
Access instance state: Task-decorated methods can access
self.temperatureMixed sync/async: Use
@agent_actionon both sync and async methodsBackground loops: Use
@agent_loopon async methods - automatically started and managed when deployed as agentsInfrastructure separation: Workflow config passed via
manager.launch(), not__init__Testable behaviors: Behaviors can be tested standalone, agents deployed remotely
Note
@agent_loop methods only run automatically in agents: Background loops require the Agent
infrastructure to provide the shutdown event and manage their lifecycle.
If you call methods on a standalone behavior instance, loops will not execute automatically.
Only when you deploy the behavior as an agent using @chiltepin_agent and
manager.launch() will the loops activate automatically.
Note
@agent_loop requires async methods: The @agent_loop decorator can only be applied to
async methods. This is validated at decoration time, and a TypeError will be raised if you
attempt to use it on a synchronous method. Background loops must be async to properly
cooperate with the agent’s event loop.
Launching Agents
Use AgentSystem to create a manager and launch agents:
from chiltepin import Workflow, AgentSystem
async def main():
# Configuration for the manager's workflow (where agents run)
manager_config = {
"manager-executor": {
"endpoint": ENDPOINT_UUID,
"provider": "localhost",
}
}
# Configuration for the agent's internal workflow (where tasks run)
agent_config = {
"compute": {
"provider": "slurm",
"partition": "compute",
# ... other config
}
}
# Start workflow for hosting agents
workflow = Workflow(manager_config, include=["manager-executor"])
workflow.start()
# Create agent system
agent_system = AgentSystem(
workflow=workflow,
executor_names=["manager-executor"],
)
# Launch and interact with agent
async with await agent_system.manager() as manager:
model = await manager.launch(
WeatherModelAgent, # Agent wrapper class
agent_workflow_config=agent_config, # Agent's workflow config
agent_workflow_include=["compute"], # Which executors to use
args=(25.0,), # Arguments for __init__
executor="manager-executor" # Where to run the agent
)
# Call agent actions
temp = await model.get_temperature()
forecast = await model.forecast(executor=["compute"])
workflow.cleanup()
Runtime Configuration
Infrastructure concerns (workflow config, executors, directories) are passed to
manager.launch() rather than the behavior class:
model = await manager.launch(
WeatherModelAgent, # Agent wrapper class
agent_workflow_config=agent_config, # Workflow configuration dict or YAML path
agent_workflow_include=["compute"], # List of executors to include (None = all)
agent_workflow_run_dir="/custom/path", # Directory for Parsl runtime files
args=(25.0,), # Behavior arguments (domain logic)
kwargs={"units": "C"}, # Behavior keyword arguments
executor="manager-executor" # Agent executor (where agent runs)
)
This separation keeps behavior classes focused on domain logic:
# Behavior class - pure domain logic
class WeatherModelBehavior:
def __init__(self, temperature: float, units: str = "C"):
# Only domain logic, no infrastructure concerns
self.temperature = temperature
self.units = units
# Agent wrapper - minimal, just deployment
@chiltepin_agent()
class WeatherModelAgent(WeatherModelBehavior):
"""Deploy WeatherModelBehavior as an agent."""
pass
Decorator Parameters
The @chiltepin_agent decorator accepts default values that can be overridden at runtime:
# Behavior with domain logic
class MyBehavior:
@agent_action
async def do_work(self): pass
# Agent wrapper with defaults
@chiltepin_agent(agent_workflow_include=["default-compute"], agent_workflow_run_dir="./runs")
class MyAgent(MyBehavior):
pass
# Use decorator defaults
agent1 = await manager.launch(MyAgent, agent_workflow_config=cfg)
# Override at runtime - more flexible
agent2 = await manager.launch(
MyAgent,
agent_workflow_config=cfg,
agent_workflow_include=["special-compute"], # Overrides decorator default
agent_workflow_run_dir="/tmp/runs" # Overrides decorator default
)
Action Decorators
Use @agent_action to expose methods as agent actions. The decorator works with both
synchronous and asynchronous methods:
Synchronous Actions
from chiltepin.agents import chiltepin_agent, agent_action
from chiltepin.tasks import python_task
@chiltepin_agent()
class DataProcessor:
@python_task
@agent_action
def process_data(self, data: str) -> str:
"""Synchronous task-decorated method."""
return data.upper()
@agent_action
def get_config(self) -> dict:
"""Synchronous helper method."""
return {"version": "1.0"}
Asynchronous Actions
@chiltepin_agent()
class AsyncService:
@agent_action
async def fetch_data(self, url: str) -> str:
"""Async method using httpx, aiohttp, etc."""
import httpx # ✅ Import inside method for serializability
# ✅ Create client temporarily, don't store in self.__init__
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.text
Task-Decorated Actions
When using chiltepin task decorators (@python_task, @bash_task, @join_task) with @agent_action,
the order does not matter and both are supported:
@chiltepin_agent()
class Computer:
@python_task
@agent_action
def compute(self, x: int) -> int:
return x ** 2
This allows the task to access instance state (self) while still executing remotely.
Note
Helper Methods and Internal Implementation:
Methods without @agent_action or @agent_loop decorators are not exposed on the
agent proxy, but they remain accessible to action methods internally via self.
This is by design and enables clean separation of public API from internal implementation.
@chiltepin_agent()
class MyAgent:
def _internal_helper(self, x: int) -> int:
"""Not exposed - but actions can call it."""
return x * 2
@agent_action
async def public_action(self, x: int) -> int:
"""Exposed action that uses helper."""
# This works! Actions execute on the behavior instance
return self._internal_helper(x) + 10
# From outside:
agent = await manager.launch(MyAgent, ...)
result = await agent.public_action(5) # ✅ Returns 20
# agent._internal_helper(5) # ❌ AttributeError - not exposed
Loop Decorators
Use @agent_loop to create background tasks that run continuously:
from chiltepin.agents import chiltepin_agent, agent_loop
import asyncio
@chiltepin_agent()
class Monitor:
def __init__(self):
self.status = "initializing"
self.count = 0
@agent_loop
async def heartbeat(self, shutdown: asyncio.Event):
"""Background agent_loop that runs until agent shuts down."""
self.status = "running"
while not shutdown.is_set():
await asyncio.sleep(1)
self.count += 1
if self.count % 10 == 0:
print(f"Heartbeat: {self.count}")
self.status = "stopped"
The shutdown event is provided automatically and signals when the agent is shutting down.
AgentSystem Helper
The AgentSystem class simplifies setup by wrapping the complexity of creating
an Academy Manager with ParslPoolExecutors:
from chiltepin import Workflow, AgentSystem
# Without AgentSystem (manual setup)
from academy.manager import Manager
from academy.exchange.cloud.client import HttpExchangeFactory
from parsl.concurrent import ParslPoolExecutor
executors = {
"my-exec": ParslPoolExecutor(dfk=workflow.dfk, executors=["my-exec"])
}
async with await Manager.from_exchange_factory(
factory=HttpExchangeFactory(
"https://exchange.academy-agents.org",
auth_method="globus"
),
executors=executors
) as manager:
# Use manager
pass
# With AgentSystem (simplified)
agent_system = AgentSystem(
workflow=workflow,
executor_names=["my-exec"],
)
async with await agent_system.manager() as manager:
# Use manager - ChiltepinManager with config/include/run_dir support
pass
ChiltepinManager
ChiltepinManager is a custom Manager subclass that intercepts launch()
to support Chiltepin-specific parameters (config, include, run_dir).
It’s created automatically by AgentSystem.manager().
You can also create it directly:
from chiltepin.agents import ChiltepinManager
from academy.exchange.cloud.client import HttpExchangeFactory
async with await ChiltepinManager.from_exchange_factory(
factory=HttpExchangeFactory(
"https://exchange.academy-agents.org",
auth_method="globus"
),
executors=my_executors
) as manager:
agent = await manager.launch(
MyAgent,
agent_workflow_config=agent_config,
agent_workflow_include=["compute"]
)
Agent Composition
Agents can coordinate with other agents, enabling hierarchical or collaborative workflows. This pattern is useful when you want to:
Decompose complex tasks: Break functionality into specialized agent components
Coordinate workflows: Have a coordinator agent orchestrate multiple worker agents
Build agent hierarchies: Create supervisor agents that manage subordinate agents
Enable collaboration: Allow agents to request services from other agents
Note
Remember: All agents require internet access to communicate with the Exchange server. They must use providers (like localhost) that run on internet-connected resources.
Basic Composition Pattern
Pass agent handles as initialization parameters to create composition relationships:
from chiltepin.agents import chiltepin_agent, agent_action
@chiltepin_agent()
class LowererAgent:
"""Agent that converts text to lowercase."""
@agent_action
async def lower(self, text: str) -> str:
"""Convert text to lowercase."""
return text.lower()
@chiltepin_agent()
class ReverserAgent:
"""Agent that reverses text."""
@agent_action
async def reverse(self, text: str) -> str:
"""Reverse text."""
return text[::-1]
@chiltepin_agent()
class CoordinatorAgent:
"""Agent that coordinates other agents to process text."""
def __init__(self, lowerer, reverser):
"""Initialize with handles to other agents.
Parameters
----------
lowerer : Handle[LowererAgent]
Handle to the lowerer agent
reverser : Handle[ReverserAgent]
Handle to the reverser agent
"""
self.lowerer = lowerer
self.reverser = reverser
@agent_action
async def process(self, text: str) -> str:
"""Process text by lowering then reversing it.
This demonstrates agent composition - calling actions on other agents.
"""
# Call action on lowerer agent
text = await self.lowerer.lower(text)
# Call action on reverser agent
text = await self.reverser.reverse(text)
return text
Launching Composed Agents
Launch worker agents first, then pass their handles to the coordinator.
Example: All agents on the same endpoint:
from chiltepin import Workflow, AgentSystem
async def main():
# Configuration: all 3 agents share one endpoint → need 3 workers
config = {
"agent-executor": {
"provider": "localhost", # Local machine or remote endpoint
"cores_per_node": 1,
"max_workers_per_node": 3, # CRITICAL: 3 concurrent agents = 3 workers
}
}
workflow = Workflow(config)
workflow.start()
agent_system = AgentSystem(
workflow=workflow,
executor_names=["agent-executor"]
)
async with await agent_system.manager() as manager:
# All three agents deployed to the same executor
lowerer = await manager.launch(
LowererAgent,
agent_workflow_config=config,
executor="agent-executor" # Occupies worker 1
)
reverser = await manager.launch(
ReverserAgent,
agent_workflow_config=config,
executor="agent-executor" # Occupies worker 2
)
# Launch coordinator agent with handles to workers
coordinator = await manager.launch(
CoordinatorAgent,
agent_workflow_config=config,
args=(lowerer, reverser), # Pass agent handles
executor="agent-executor" # Occupies worker 3
)
# Use the coordinator - it will call lowerer and reverser
result = await coordinator.process("Hello World")
print(f"Result: {result}") # Output: "dlrow olleh"
workflow.cleanup()
Warning
Critical: Deadlock Risk with Insufficient Workers
Agent composition requires careful resource provisioning. Each agent occupies a worker slot on the endpoint/provider where it runs, and if agents call each other, you must provision enough workers for all agents to run concurrently or you will experience deadlock.
Scenario 1: Multiple agents on the same endpoint/provider
In the example above, all 3 agents run on the same localhost provider:
3 agents (coordinator, lowerer, reverser) need to run concurrently
The coordinator calls actions on lowerer and reverser
All 3 agents must be active simultaneously
Therefore,
max_workers_per_nodemust be at least 3
# All agents share the same provider → need 3 workers
config = {
"agent-executor": {
"provider": "localhost", # or endpoint UUID for remote localhost
"max_workers_per_node": 3, # REQUIRED: At least one per concurrent agent
}
}
With insufficient workers: If you set max_workers_per_node=1, the first agent to be launched
occupies the only worker, then waits for other agents to start or respond, but they can’t
start because no workers are available → deadlock.
Scenario 2: Each agent on a different endpoint (advanced)
Alternatively, you can deploy each agent to a different endpoint with max_workers_per_node=1:
# Each agent gets its own endpoint → 1 worker per endpoint is sufficient
config = {
"coordinator-executor": {
"endpoint": "coordinator-endpoint-uuid",
"provider": "localhost",
"max_workers_per_node": 1, # Only runs coordinator
},
"lowerer-executor": {
"endpoint": "lowerer-endpoint-uuid",
"provider": "localhost",
"max_workers_per_node": 1, # Only runs lowerer
},
"reverser-executor": {
"endpoint": "reverser-endpoint-uuid",
"provider": "localhost",
"max_workers_per_node": 1, # Only runs reverser
},
}
# Launch each agent to its dedicated executor (inside async with manager context)
async with await agent_system.manager() as manager:
lowerer = await manager.launch(LowererAgent, executor="lowerer-executor", ...)
reverser = await manager.launch(ReverserAgent, executor="reverser-executor", ...)
coordinator = await manager.launch(CoordinatorAgent, executor="coordinator-executor", ...)
This approach avoids worker contention but requires multiple endpoints with internet access.
Best practice:
Single endpoint: Set
max_workers_per_node≥ number of concurrent agentsMultiple endpoints: Each agent can use its own endpoint with
max_workers_per_node=1Hybrid: Mix approaches based on your infrastructure (e.g., similar agents share endpoints)
Best Practices
Import Decorators Correctly
Always use Chiltepin’s decorators (agent_action and agent_loop), not Academy’s native
decorators (action and loop):
# ✅ Correct - Use Chiltepin's decorators
from chiltepin.agents import chiltepin_agent, agent_action, agent_loop
# ❌ Wrong - Academy's decorators have different names and semantics
from academy.agent import action, loop
Academy’s @action requires async methods, while Chiltepin’s @agent_action works with both sync and async.
Keep Behavior Classes Serializable
Since agents can run remotely, behavior classes must be serializable:
@chiltepin_agent()
class GoodAgent:
def __init__(self, value: int):
self.value = value # ✅ Serializable types
@agent_action
@python_task
def compute(self):
# ✅ Import modules inside methods for remote execution
import numpy as np
return np.array([self.value])
@chiltepin_agent()
class BadAgent:
def __init__(self, value: int):
import numpy as np # ❌ Don't import at class level
self.np = np # ❌ Modules may not serialize
self.value = value
Separate Infrastructure from Logic
Pass infrastructure concerns via manager.launch():
@chiltepin_agent()
class MyAgent:
def __init__(self, threshold: float): # ✅ Domain parameters only
self.threshold = threshold
# ❌ Bad - mixing infrastructure with domain logic
# def __init__(self, threshold: float, workflow_config: dict):
# self.threshold = threshold
# self.config = workflow_config
Use Type Hints
Type hints improve code clarity and enable better IDE support:
from typing import List
@chiltepin_agent()
class TypedAgent:
def __init__(self, values: List[float]):
self.values = values
@agent_action
@python_task
def mean(self) -> float:
return sum(self.values) / len(self.values)
Agent Inheritance
Agents support inheritance, but only undecorated classes can be extended. Apply
@chiltepin_agent only to the final “leaf” class in your hierarchy, not to intermediate
parent classes. When you decorate a class, it becomes an Agent wrapper and cannot be
extended further.
Important
Design for Extensibility:
If you want to create reusable agent behavior that others can extend, do not apply
the decorator to the base class. Leave base classes undecorated and let users apply
@chiltepin_agent to their own specialized subclasses.
Why This Limitation Exists:
The @chiltepin_agent decorator dynamically creates and returns an Agent subclass
wrapper around your behavior class, while preserving the original class name on the
decorated result. In other words, after decoration, MyAgent is no longer the original
undecorated behavior class, even though it still appears under the name MyAgent. This means:
❌ Cannot extend decorated agents: Subclassing a decorated agent gives you an Agent, not a behavior class
✅ Can extend undecorated behaviors: Subclass undecorated classes, then decorate the child
Recommended Pattern:
# Base behavior class - NOT decorated (designed for extension)
class ReusableAgentBehavior:
"""Base class providing common agent functionality.
Leave this undecorated so others can extend it.
"""
def __init__(self, base_value: int):
self.base_value = base_value
@agent_action
async def get_value(self) -> int:
"""Common action available to all subclasses."""
return self.base_value
@agent_action
async def increment(self) -> None:
"""Common mutation."""
self.base_value += 1
def _helper(self) -> str:
"""Private helper - not exposed."""
return "helper"
# User extends and decorates their specific implementation
@chiltepin_agent()
class MySpecializedAgent(ReusableAgentBehavior):
"""My custom agent extending the reusable base.
Apply decorator here - this is my final implementation.
"""
def __init__(self, base_value: int, name: str):
super().__init__(base_value)
self.name = name
@agent_action
async def get_name(self) -> str:
"""My custom action."""
return self.name
@agent_action
async def get_full_status(self) -> dict:
"""Combines base and specialized behavior."""
return {
"name": self.name,
"value": self.base_value
}
When launched, MySpecializedAgent exposes all actions from both the base class
(get_value, increment) and the specialized class (get_name, get_full_status).
What NOT to Do:
# ❌ DON'T: Decorating the base class
@chiltepin_agent() # ← Prevents others from extending this
class MyAgent:
@agent_action
async def action1(self): pass
# This WON'T WORK - MyAgent is now an Agent subclass, not a behavior
class ExtendedAgent(MyAgent): # ← Subclassing an Agent, not the behavior
@agent_action
async def action2(self): pass # Won't be discovered correctly
If you’ve already decorated a class and need to extend it, create a new undecorated base class with the shared behavior and decorate new leaf classes.
Multiple Inheritance (Mixins):
The pattern works well with mixins - create undecorated mixin classes and combine them in a decorated final class:
# Undecorated mixins - reusable across agents
class StorageMixin:
@agent_action
async def save(self, data: str) -> str:
return f"saved: {data}"
class ProcessingMixin:
@agent_action
async def process(self, data: str) -> str:
return data.upper()
# Combine mixins in your final decorated agent
@chiltepin_agent()
class MyAgent(StorageMixin, ProcessingMixin):
@agent_action
async def custom_action(self): pass
All methods from all mixins (save, process, custom_action) will be exposed.
Summary:
📦 Create libraries: Provide undecorated base classes and mixins
🎯 Use libraries: Extend undecorated classes, add your methods, then decorate
🚫 Never decorate: Intermediate classes that others might extend
Complete Example
Here’s a complete example using the recommended behavior/agent wrapper pattern:
import asyncio
import logging
from chiltepin import Workflow, AgentSystem
from chiltepin.agents import chiltepin_agent, agent_action, agent_loop
from chiltepin.tasks import python_task
logger = logging.getLogger(__name__)
# 1. Define behavior class - pure domain logic, reusable, testable
class TemperatureModelBehavior:
"""Reusable temperature model behavior."""
def __init__(self, initial_temp: float, location: str):
self.temperature = initial_temp
self.location = location
self.forecast_count = 0
@agent_action
@python_task
def forecast(self) -> str:
"""Generate forecast using current temperature."""
import random
conditions = ["sunny", "cloudy", "rainy", "snowy"]
condition = random.choice(conditions)
self.forecast_count += 1
return f"{self.location}: {condition}, {self.temperature:.1f}°C"
@agent_action
async def get_stats(self) -> dict:
"""Get current statistics."""
return {
"temperature": self.temperature,
"location": self.location,
"forecasts_generated": self.forecast_count
}
@agent_action
async def set_temperature(self, temp: float) -> None:
"""Manually update temperature."""
self.temperature = temp
@agent_loop
async def update_temperature(self, shutdown: asyncio.Event):
"""Simulate temperature changes - runs automatically when deployed as agent."""
import asyncio
import random
while not shutdown.is_set():
await asyncio.sleep(2)
# Random walk
self.temperature += random.uniform(-1, 1)
# Keep reasonable bounds
self.temperature = max(-50, min(50, self.temperature))
# 2. Create agent wrapper for deployment
@chiltepin_agent(agent_workflow_include=["compute"])
class TemperatureModelAgent(TemperatureModelBehavior):
"""Agent deployment wrapper for TemperatureModelBehavior."""
pass # Inherits all behavior
async def main():
# Manager workflow configuration
manager_config = {
"manager-executor": {
"endpoint": "your-endpoint-uuid",
"provider": "localhost",
}
}
# Agent workflow configuration
agent_config = {
"compute": {
"provider": "slurm",
"partition": "compute",
"cores_per_node": 48,
"walltime": "01:00:00",
}
}
# Start manager workflow
workflow = Workflow(manager_config, include=["manager-executor"])
workflow.start()
# Create agent system
agent_system = AgentSystem(
workflow=workflow,
executor_names=["manager-executor"],
)
async with await agent_system.manager() as manager:
# Launch agent wrapper with runtime configuration
model = await manager.launch(
TemperatureModelAgent, # Deploy the agent wrapper
agent_workflow_config=agent_config,
agent_workflow_include=["compute"],
args=(20.0, "Boulder, CO"),
executor="manager-executor"
)
# Interact with agent
logger.info("Getting initial stats...")
stats = await model.get_stats()
logger.info(f"Stats: {stats}")
logger.info("Generating forecast...")
forecast = await model.forecast(executor=["compute"])
logger.info(f"Forecast: {forecast}")
logger.info("Waiting for temperature updates...")
await asyncio.sleep(5)
stats = await model.get_stats()
logger.info(f"Updated stats: {stats}")
logger.info("Setting temperature manually...")
await model.set_temperature(25.0)
forecast = await model.forecast(executor=["compute"])
logger.info(f"New forecast: {forecast}")
workflow.cleanup()
logger.info("Done!")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())
Troubleshooting
Serialization Errors
If you get serialization errors when launching agents:
Check that behavior class doesn’t inherit from non-serializable classes
Move imports inside methods rather than at class level
Avoid storing non-serializable objects (file handles, connections) in
self
Action Not Found
If an agent_action isn’t available on the agent proxy:
Check that the method is decorated with
@agent_actionVerify you’re using
agent_actionfromchiltepin.agents, notactionfromacademy.agentEnsure the method name doesn’t start with underscore (private methods aren’t exposed)
Workflow Not Starting
If the agent’s internal workflow doesn’t start:
Check that
configis passed tomanager.launch()Verify the configuration dict is valid (see Configuration)
Check that
includeparameter matches actual executor names in configCheck that requested executors are available and can start (e.g. Slurm partition is correct)
Check that resources are available (e.g. Slurm queue isn’t full)
See Also
Tasks - For information about task decorators
Configuration - For workflow configuration details
Quick Start - For getting started with Chiltepin
Academy Agents Documentation - For more on Academy