API Reference
This section provides detailed documentation for all Chiltepin modules. Click on any function, class, or method to view its documentation and source code.
Configuration Module
- chiltepin.configure.create_executor(name: str, config: Dict[str, Any], client: Client | None = None) ParslExecutor[source]
Create an Executor specified by the given resource configuration
- Parameters:
name (str) – The name of the resource
config (Dict[str, Any]) – YAML configuration block that contains the resource’s configuration
client (Client | None) – A Globus Compute client to use when instantiating Globus Compute resources. The default is None. If None, one will be instantiated automatically for any Globus Compute resources in the configuration. Only applies to Globus Compute resources
- Return type:
ParslExecutor
- chiltepin.configure.create_globus_compute_executor(name: str, config: Dict[str, Any], client: Client | None = None) GlobusComputeExecutor[source]
Construct a GlobusComputeExecutor from the input configuration
- Parameters:
name (str) – A label that will be assigned to the returned GlobusComputeExecutor for naming purposes
config (Dict[str, Any]) –
YAML configuration block that contains the following configuration options:
Option key Default value “endpoint id”: No default, this option is required “mpi” False “max mpi apps”: 1 “mpi_launcher”: “srun” for Slurm, otherwise “mpiexec” “provider”: “localhost” “cores per node”: 1 “nodes per block”: 1 “init blocks”: 0 “min blocks”: 0 “max blocks”: 1 “exclusive”: True “partition”: “” “queue”: “” “account”: “” “walltime”: “00:10:00” “environment”: []
client (Client | None) – The Globus Compute client to use for instantiating the GlobusComputeExecutor. If not specified, Globus Compute will instantiate and use a default client.
- Return type:
GlobusComputeExecutor
- chiltepin.configure.create_htex_executor(name: str, config: Dict[str, Any]) HighThroughputExecutor[source]
Construct a HighThroughputExecutor from the input configuration
- Parameters:
name (str) – A label that will be assigned to the returned HighThroughputExecutor for naming purposes
config (Dict[str, Any]) –
YAML configuration block that contains the following configuration options:
Option key Default value “provider”: “localhost” “cores per node”: 1 “nodes per block”: 1 “init blocks”: 0 “min blocks”: 0 “max blocks”: 1 “exclusive”: True “partition”: None “queue”: None “account”: None “walltime”: “00:10:00” “environment”: []
- Return type:
HighThroughputExecutor
- chiltepin.configure.create_mpi_executor(name: str, config: Dict[str, Any]) MPIExecutor[source]
Construct a MPIExecutor from the input configuration
- Parameters:
name (str) – A label that will be assigned to the returned MPIExecutor for naming purposes
config (Dict[str, Any]) –
YAML configuration block that contains the following configuration options:
Option key Default value “max mpi apps”: 1 “mpi_launcher”: “srun” for Slurm, otherwise “mpiexec” “provider”: “localhost” “cores per node”: 1 “nodes per block”: 1 “init blocks”: 0 “min blocks”: 0 “max blocks”: 1 “exclusive”: True “partition”: None “queue”: None “account”: None “walltime”: “00:10:00” “environment”: []
- Return type:
MPIExecutor
- chiltepin.configure.create_provider(config: Dict[str, Any]) ExecutionProvider[source]
Create the appropriate ExecutionProvider from the given configuration
- Parameters:
config (Dict[str, Any]) –
YAML configuration block that contains the following configuration options. Not all options are valid for all providers.
Options for all providers:
Option key Default value “mpi” False “provider”: “localhost” “init blocks”: 0 “min blocks”: 0 “max blocks”: 1 “environment”: []
Options for Slurm provider: “cores per node”: 1 “nodes per block”: 1 “exclusive”: True “partition”: None “queue”: None “account”: None “walltime”: “00:10:00”
Options for PBSPro provider: “cores per node”: 1 “nodes per block”: 1 “queue”: None “account”: None “walltime”: “00:10:00”
- Return type:
ExecutionProvider
- chiltepin.configure.load(config: Dict[str, Any], include: List[str] | None = None, client: Client | None = None, run_dir: str | None = None) Config[source]
Return a Parsl Config initialized by a list of Executors created from the input configuration dictionary.
The Config object returned by this function is used in parsl.load(config)
- Parameters:
config (Dict[str, Any]) – YAML configuration block that contains the configuration for a list of resources
include (List[str] | None) – A list of the labels of the resource configurations to load. The default is None. If None, all resource configurations are loaded. Otherwise the configurations for resources whose labels are in the list will be loaded.
client (Client | None) – A Globus Compute client to use when instantiating Globus Compute resources. The default is None. If None, one will be instantiated automatically for any Globus Compute resources in the configuration.
run_dir (str | None) – The directory to use for runtime files. The default is None, which means Parsl’s default runinfo directory location will be used.
- Return type:
Config
Workflow Module
Workflow context managers for Chiltepin.
This module provides context managers that wrap Parsl configuration and lifecycle management, eliminating the need for users to directly import or interact with Parsl.
- class chiltepin.workflow.Workflow(config: str | Path | Dict[str, Any] | None = None, *, include: List[str] | None = None, run_dir: str | None = None, client: Client | None = None, log_file: str | None = None, log_level: int | None = None)[source]
Bases:
objectWorkflow context manager for Chiltepin.
Can be used as a context manager or with explicit start/cleanup calls. This wraps Parsl configuration and lifecycle management, eliminating the need for users to directly import or interact with Parsl.
- Parameters:
config (str, Path, dict, or None) – Either a path to a YAML configuration file, a configuration dictionary, or None to use default configuration (local executor only)
include (list of str, optional) – List of resource labels to load. If None, all resources are loaded. Note: The default “local” resource is always available, regardless of this parameter.
run_dir (str, optional) – Directory for Parsl runtime files. If None, uses Parsl’s default.
client (globus_compute_sdk.Client, optional) – Globus Compute client for Globus Compute resources. If None, one will be created automatically if needed.
log_file (str, optional) – Path to Parsl log file. If None, no file logging is configured.
log_level (int, optional) – Logging level (e.g., logging.DEBUG). Only used if log_file is provided.
Examples
As a context manager with default configuration:
>>> from chiltepin import Workflow >>> from chiltepin.tasks import python_task >>> >>> @python_task >>> def my_task(): ... return "Hello!" >>> >>> with Workflow() as dfk: # or Workflow(None) ... result = my_task() ... print(result.result())
With a config file or dictionary:
>>> with Workflow("config.yaml") as dfk: ... result = my_task() ... print(result.result())
With explicit start/cleanup (for use across methods):
>>> workflow = Workflow("config.yaml") >>> dfk = workflow.start() >>> # ... do work, potentially in other methods ... >>> workflow.cleanup()
- cleanup(suppress_exceptions=False)[source]
Cleanup the workflow and release resources.
This should be called when the workflow is complete. If using as a context manager, this is called automatically.
- Parameters:
suppress_exceptions (bool) – If True, exceptions during cleanup are logged but not raised. This is used when cleaning up after a user exception to ensure the user’s exception is not masked by cleanup exceptions.
Notes
All cleanup operations are attempted even if some fail. If multiple cleanup operations raise exceptions and suppress_exceptions is False, they are chained together using __cause__, with the last exception being raised.
- start()[source]
Start the workflow and return DataFlowKernel.
- Returns:
The Parsl DataFlowKernel instance. Can be used to access workflow state (e.g., dfk.tasks) or for advanced operations.
- Return type:
DataFlowKernel
- Raises:
RuntimeError – If workflow is already started.
Endpoint Management Module
- chiltepin.endpoint.configure(name: str, config_dir: str | None = None, timeout: float | None = None) bool[source]
Configure a Globus Compute Endpoint
This is a thin wrapper around the globus-compute-endpoint configure command. Additional configuration steps, usually done manually by the user after configuration, are taken to hide complexity from the user.
- Parameters:
name (str) – Name of the endpoint to configure
config_dir (str | None) – Path to endpoint configuration directory where endpoint information is to be stored. If None (the default), then $HOME/.globus_compute is used
timeout (float | None) – Number of seconds to wait for the command to complete before timing out Default is None, meaning the command will never time out.
- chiltepin.endpoint.delete(name: str, config_dir: str | None = None, timeout: float | None = None)[source]
Delete the specified Globus Compute Endpoint
This is a thin wrapper around the globus-compute-endpoint delete command
- Parameters:
name (str) – Name of the endpoint to delete
config_dir (str | None) – Path to endpoint configuration directory where endpoint information is stored. If None (the default), then $HOME/.globus_compute is used
timeout (float | None) – Number of seconds to wait for the command to complete before timing out Default is None, meaning the command will never time out.
- chiltepin.endpoint.exists(name: str, config_dir: str | None = None) bool[source]
Return True if the endpoint exists, otherwise False
- chiltepin.endpoint.get_chiltepin_apps() Tuple[GlobusApp, GlobusApp, GlobusApp][source]
Get the Globus Apps for Globus Compute, Globus Transfer, and Academy Exchange
This instantiates GlobusApp objects for use in creating Globus Compute and Globus Transfer clients and for authenticating to the Academy Agent Exchange server. If the environment contains settings that specify client ids and/or client secrets, those will be used to create the Globus Apps. Otherwise, the default Chiltepin thick client will be used. If a secret is present in the environment, ClientApp objects will be created. Otherwise, UserApp objects will be created. This is used by the login() and logout() functions where login and logout flows are initiated after the apps are retrieved. A tuple is returned where the first item is the compute app, the second item is the transfer app, and the third item is the academy app. When called while environment variables are set for a client id and secret, this will return apps configured with those credentials.
- Return type:
Tuple[GlobusApp, GlobusApp, GlobusApp]
- chiltepin.endpoint.is_running(name: str, config_dir: str | None = None) bool[source]
Return True if the endpoint is running, otherwise False
- chiltepin.endpoint.login() Dict[str, Client | TransferClient][source]
Log in to the Chiltepin app
This initiates the Globus login flow to log the user in to the Globus compute and transfer services as well as the Academy Exchange. The login will use the registered Chiltepin thick client by default, or the client id and/or secret specified in the environment. This returns a Globus Compute client and a Globus Transfer client in a dictionary. Those clients can then be used for accessing those services.
- Return type:
Dict[str, Client | TransferClient]
- chiltepin.endpoint.login_required() bool[source]
Check whether a chiltepin login is required to use the requested Globus scopes needed by the Chiltepin transfer, compute, and Academy Apps.
- Return type:
- chiltepin.endpoint.logout()[source]
Log out of the Chiltepin app
This logs the user out of the Globus Compute, Globus Transfer, and Academy services and revokes all credentials associated with them.
- chiltepin.endpoint.show(config_dir: str | None = None) Dict[str, Dict[str, str | None]][source]
Return a dictionary of configured Globus Compute Endpoints
This returns endpoint information in a dict with keys corresponding to the endpoint names.
- chiltepin.endpoint.start(name: str, config_dir: str | None = None, timeout: float | None = None)[source]
Start the specified Globus Compute Endpoint
This is a thin wrapper around the globus-compute-endpoint start command
- Parameters:
name (str) – Name of the endpoint to start
config_dir (str | None) – Path to endpoint configuration directory where endpoint information is stored. If None (the default), then $HOME/.globus_compute is used
timeout (float | None) – Number of seconds to wait for the command to complete before timing out Default is None, meaning the command will never time out.
- chiltepin.endpoint.stop(name: str, config_dir: str | None = None, timeout: float | None = None)[source]
Stop the specified Globus Compute Endpoint
This is a thin wrapper around the globus-compute-endpoint stop command
- Parameters:
name (str) – Name of the endpoint to stop
config_dir (str | None) – Path to endpoint configuration directory where endpoint information is stored. If None (the default), then $HOME/.globus_compute is used
timeout (float | None) – Number of seconds to wait for the command to complete before timing out Default is None, meaning the command will never time out.
Tasks Module
Task decorators for Chiltepin workflows.
This module provides decorators for defining workflow tasks that can be executed on configured resources. Tasks are the fundamental units of work in Chiltepin workflows.
Available Decorators
python_task(): Execute Python functions as workflow tasksbash_task(): Execute shell commands as workflow tasksjoin_task(): Coordinate multiple tasks without blocking workflow execution
For comprehensive usage examples and best practices, see the Tasks documentation.
Examples
Define a simple Python task:
from chiltepin.tasks import python_task
@python_task
def add_numbers(a, b):
return a + b
# Execute on a specific resource
result = add_numbers(5, 3, executor=["compute"]).result()
Define a bash task:
from chiltepin.tasks import bash_task
@bash_task
def list_files(directory):
return f"ls -la {directory}"
# Returns exit code (0 = success)
exit_code = list_files("/tmp", executor=["compute"]).result()
Define an MPI task using task geometry:
@bash_task
def run_mpi_simulation(input_file):
return f"$PARSL_MPI_PREFIX ./simulation {input_file}"
# Specify parallel resource requirements
exit_code = run_mpi_simulation(
"config.in",
executor=["mpi"],
chiltepin_task_geometry={
"num_nodes": 4,
"num_ranks": 16,
"ranks_per_node": 4
}
).result()
- class chiltepin.tasks.MethodWrapper(func, wrapper_func)[source]
Bases:
objectWrapper that preserves method behavior for decorated functions.
This descriptor ensures that when a decorated function is accessed as a method, it properly creates a bound method with the instance.
- chiltepin.tasks.bash_task(function: Callable) Callable[source]
Decorator function for making Chiltepin bash tasks.
The decorator transforms the function into a Parsl bash_app but adds an executor argument such that the executor for the function can be chosen dynamically at runtime.
- Parameters:
function (Callable) – The function to be decorated to yield a Bash workflow task. This function can be a stand-alone function or a class method. If it is a class method, it can make use of self to access object state. The function must return a string that contains a series of bash commands to be executed.
time (The decorated function includes the following additional parameters at call) –
executor (str or list of str, default="all") – Resource name(s) where the task should execute. Can be a single resource name or a list of resource names. Defaults to “all” which allows execution on any configured resource.
chiltepin_task_geometry (dict, optional) –
Specification of parallel task geometry for MPI applications. This parameter is mapped to Parsl’s
parsl_resource_specification. The dictionary should contain:num_nodes (int): Number of nodes required for the task
num_ranks (int): Total number of MPI ranks
ranks_per_node (int): Number of MPI ranks per node
Example:
chiltepin_task_geometry={ "num_nodes": 4, "num_ranks": 16, "ranks_per_node": 4 }
stdout (str or tuple, optional) – File path for capturing standard output. Can be a string path or a tuple of (path, mode) where mode is typically ‘w’ for write or ‘a’ for append.
stderr (str or tuple, optional) – File path for capturing standard error. Can be a string path or a tuple of (path, mode) where mode is typically ‘w’ for write or ‘a’ for append.
inputs (list of AppFuture, optional) – List of futures that must complete before this task starts. Used to create task dependencies without passing data between tasks. See Parsl’s documentation for details.
note:: (..) – All keyword arguments supported by Parsl’s
bash_appdecorator (such asoutputs,walltime, etc.) are also accepted and passed through to the underlying Parsl app.warning:: (..) –
Class Method Limitation: When decorating class methods, the entire class instance (
self) must be picklable because it gets serialized and sent to remote workers. Classes that contain unpicklable objects (network connections, file handles, database connections, remote proxies, etc.) cannot be used with this decorator.See the
python_taskdecorator documentation for examples of recommended workarounds.
- Returns:
The decorated function that can be called as a workflow task. Returns the exit code of the bash command (0 indicates success).
- Return type:
Callable
Examples
Basic usage:
@bash_task def compile_code(): return "gcc -o program program.c" exit_code = compile_code(executor=["compute"]).result()
MPI task with task geometry:
@bash_task def run_mpi_simulation(input_file): return f"$PARSL_MPI_PREFIX ./simulation {input_file}" exit_code = run_mpi_simulation( "config.in", executor=["mpi"], chiltepin_task_geometry={"num_nodes": 4, "num_ranks": 16, "ranks_per_node": 4}, stdout="output.log" ).result()
- chiltepin.tasks.join_task(function: Callable) Callable[source]
Decorator function for making Chiltepin join tasks.
The decorator transforms the function into a Parsl join_app. A parsl @join_app decorator accomplishes the same thing. This decorator is added to provide API consistency so that users can use @join_task rather than @join_app along with @python_task and @bash_task.
- Parameters:
function (Callable) – The function to be decorated to yield a join workflow task. This function can be a stand-alone function or a class method. If it is a class method, it can make use of self to access object state. The function is expected to call multiple python or bash tasks and return a Future that encapsulates the result of those tasks.
warning:: (..) –
Class Method Limitation: When decorating class methods, the entire class instance (
self) must be picklable because it gets serialized and sent to remote workers. Classes that contain unpicklable objects (network connections, file handles, database connections, remote proxies, etc.) cannot be used with this decorator.See the
python_taskdecorator documentation for examples of recommended workarounds.
- Return type:
Callable
- chiltepin.tasks.python_task(function: Callable) Callable[source]
Decorator function for making Chiltepin python tasks.
The decorator transforms the function into a Parsl python_app but adds an executor argument such that the executor for the function can be chosen dynamically at runtime.
- Parameters:
function (Callable) – The function to be decorated to yield a Python workflow task. This function can be a stand-alone function or a class method. If it is a class method, it can make use of self to access object state.
time (The decorated function includes the following additional parameters at call) –
executor (str or list of str, default="all") – Resource name(s) where the task should execute. Can be a single resource name or a list of resource names. Defaults to “all” which allows execution on any configured resource.
chiltepin_task_geometry (dict, optional) –
Specification of parallel task geometry for MPI applications. This parameter is mapped to Parsl’s
parsl_resource_specification. The dictionary should contain:num_nodes (int): Number of nodes required for the task
num_ranks (int): Total number of MPI ranks
ranks_per_node (int): Number of MPI ranks per node
Example:
chiltepin_task_geometry={ "num_nodes": 4, "num_ranks": 16, "ranks_per_node": 4 }
inputs (list of AppFuture, optional) – List of futures that must complete before this task starts. Used to create task dependencies without passing data between tasks. See Parsl’s documentation for details.
note:: (..) – All keyword arguments supported by Parsl’s
python_appdecorator (such asoutputs,walltime, etc.) are also accepted and passed through to the underlying Parsl app.warning:: (..) –
Class Method Limitation: When decorating class methods, the entire class instance (
self) must be picklable because it gets serialized and sent to remote workers. Classes that contain unpicklable objects (network connections, file handles, database connections, remote proxies, etc.) cannot be used with this decorator.Workaround 1 - Standalone Functions: Extract the task logic into a standalone function and pass only picklable data as arguments:
@python_task def process_data(config, text): return f"Processed {text}" class MyClass: def __init__(self): self.config = {"param": "value"} # Picklable self.connection = NetworkConnection() # NOT picklable def my_method(self, text): return process_data(self.config, text)
Workaround 2 - Helper Class: Create a helper class with only picklable state to hold task methods:
class TaskBehavior: @python_task def process(self, config, text): return f"Processed {text}" class MyClass: def __init__(self): self.config = {"param": "value"} self.connection = NetworkConnection() # NOT picklable self.tasks = TaskBehavior() def my_method(self, text): return self.tasks.process(self.config, text, executor=["compute"])
- Returns:
The decorated function that can be called as a workflow task.
- Return type:
Callable
Examples
Basic usage:
@python_task def compute(x): return x ** 2 result = compute(5, executor=["compute"]).result()
MPI task with task geometry:
@python_task def run_mpi_code(params): # MPI code execution return "result" future = run_mpi_code( params, executor=["mpi"], chiltepin_task_geometry={"num_nodes": 2, "num_ranks": 8, "ranks_per_node": 4} )
Agents Module
Agent system integration for Chiltepin workflows.
This module provides simplified interfaces for integrating Academy agents with Chiltepin workflows and Parsl executors.
When using @chiltepin_agent decorator, always import agent_action and agent_loop decorators from chiltepin.agents, NOT from academy.agent:
from chiltepin.agents import chiltepin_agent, agent_action, agent_loop # ✅ Correct
from academy.agent import action, loop # ❌ Wrong for @chiltepin_agent
Chiltepin’s decorators work with both sync and async methods, while Academy’s action decorator requires async methods only.
- class chiltepin.agents.AgentSystem(workflow: Workflow, executor_names: List[str], exchange_address: str = 'https://exchange.academy-agents.org', auth_method: str = 'globus')[source]
Bases:
objectSimplified agent management system for Chiltepin workflows.
This class wraps the complexity of setting up Academy Manager with ParslPoolExecutors and HttpExchangeFactory, providing a clean interface for users who always use this pattern.
- Parameters:
Examples
from chiltepin import Workflow from chiltepin.agents import AgentSystem config = {"my-executor": {...}} workflow = Workflow(config) workflow.start() agent_system = AgentSystem( workflow=workflow, executor_names=["my-executor"], exchange_address="https://exchange.academy-agents.org" ) async with await agent_system.manager() as manager: # Launch and interact with agents agent = await manager.launch(MyAgent, executor="my-executor") result = await agent.some_action() workflow.cleanup()
- property executors: Dict[str, ParslPoolExecutor] | None
Access the created ParslPoolExecutors.
Returns None if executors haven’t been created yet.
- async manager() ChiltepinManager[source]
Create and return a Chiltepin Manager context manager.
This method returns a Chiltepin Manager configured with HttpExchangeFactory using Globus authentication. The Manager is created with ParslPoolExecutors for all configured executors.
- Returns:
An async context manager for the Chiltepin Manager
- Return type:
Examples
async with await agent_system.manager() as manager: agent = await manager.launch(MyAgent, executor="my-executor") result = await agent.some_action()
- class chiltepin.agents.ChiltepinManager(exchange_client: ExchangeClient[ExchangeTransportT], executors: Executor | MutableMapping[str, Executor | None] | None = None, *, default_executor: str = 'event_loop', max_retries: int = 0)[source]
Bases:
ManagerCustom Manager that supports agent_workflow_config=, agent_workflow_include=, and agent_workflow_run_dir= kwargs in launch().
This Manager subclass intercepts launch() calls to extract chiltepin-specific keyword arguments (agent_workflow_config, agent_workflow_include, agent_workflow_run_dir) and passes them to agents created with the @chiltepin_agent decorator.
This keeps workflow infrastructure concerns (Parsl configuration) separate from behavior logic, allowing behavior classes to focus on domain logic only.
- async launch(agent_class: Type[AgentT], args: Tuple[Any, ...] | None = None, kwargs: Dict[str, Any] | None = None, agent_workflow_config: str | Path | Dict[str, Any] | None = None, agent_workflow_include: List[str] | None = None, agent_workflow_run_dir: str | None = None, **manager_kwargs: Any) Handle[AgentT][source]
Launch an agent, supporting chiltepin-specific configuration.
- Parameters:
agent_class (Type[AgentT]) – The agent class to launch
args (Optional[Tuple[Any, ...]]) – Tuple of positional arguments for agent __init__ (behavior logic only)
kwargs (Optional[Dict[str, Any]]) – Dict of keyword arguments for agent __init__ (behavior logic only)
agent_workflow_config (Optional[Union[str, Path, Dict[str, Any]]]) – Workflow configuration dict or path (chiltepin agents only)
agent_workflow_include (Optional[List[str]]) – Optional list of executor labels for workflow (chiltepin agents only)
agent_workflow_run_dir (Optional[str]) – Optional run directory for workflow (chiltepin agents only). Important: When launching multiple agents on shared filesystems, provide unique run_dir values to avoid Parsl directory collisions. If omitted, a unique directory is auto-generated.
**manager_kwargs (Any) – Other keyword arguments for Manager (e.g., executor, resources)
- Returns:
The launched agent proxy
- Return type:
Handle[AgentT]
Examples
model = await manager.launch( MyModel, agent_workflow_config=ursa_config, # ← Workflow config agent_workflow_include=["ursa-compute"], # ← Which executors agent_workflow_run_dir="/custom/path", # ← Where to run args=(25.0,), # ← Behavior args only executor="ursa-service-gc" # ← Manager executor )
- chiltepin.agents.agent_action(func: Callable) Callable[source]
Marker decorator to indicate a method should be exposed as an agent action.
Use this decorator on methods in classes decorated with @chiltepin_agent to mark them as actions that should be exposed through the agent interface.
Unlike Academy’s @action decorator, this works for both sync and async methods, making it suitable for @python_task decorated methods as well as async helpers.
Examples
from chiltepin.agents import chiltepin_agent, agent_action from chiltepin.tasks import python_task @chiltepin_agent(agent_workflow_include=["compute"]) class MyModel: @python_task @agent_action # ← Works for sync task methods def compute(self): return "result" @agent_action # ← Also works for async methods async def get_status(self): return "ready"
Important
Always import from chiltepin.agents, not academy.agent:
from chiltepin.agents import agent_action # ✅ Correct from academy.agent import action # ❌ Wrong - different semantics
Using Academy’s decorator will cause confusing errors. Academy’s @action requires async methods, while chiltepin’s works with any callable.
- chiltepin.agents.agent_loop(func: Callable) Callable[source]
Marker decorator for background agent_loop methods in chiltepin agents.
Use this decorator on async methods with a ‘shutdown’ parameter that should run as background loops in classes decorated with @chiltepin_agent.
This is equivalent to Academy’s @loop decorator but provided here for consistency so all decorators can be imported from chiltepin.agents.
Important
The decorated method MUST be async. The decorator validates this at decoration time and will raise a TypeError if applied to a non-async method. Background loops need to be async to properly cooperate with the agent’s event loop.
Important
Always import from chiltepin.agents, not academy.agent:
from chiltepin.agents import agent_loop # ✅ Correct from academy.agent import loop # ❌ Wrong - will cause type errors
Using Academy’s decorator will cause confusing signature validation errors.
Examples
from chiltepin.agents import chiltepin_agent, agent_loop import asyncio @chiltepin_agent(agent_workflow_include=["compute"]) class MyModel: @agent_loop async def update_data(self, shutdown): while not shutdown.is_set(): await asyncio.sleep(1) # Update state
- chiltepin.agents.chiltepin_agent(*, agent_workflow_include: List[str] | None = None, agent_workflow_run_dir: str | None = None) Callable[[Type], Type[Agent]][source]
Decorator that wraps a regular Python class (behavior) in an Academy Agent.
This decorator allows you to write agent behavior as a regular, serializable Python class where task-decorated methods can access instance state directly. The decorator automatically creates an Agent wrapper that manages the workflow lifecycle and exposes the behavior’s methods as actions.
Only methods marked with @agent_action or @agent_loop decorators are exposed as agent actions. Use @agent_action for any method (sync or async) you want to expose, and @agent_loop for background loops. Both decorators should be imported from chiltepin.agents.
- Parameters:
agent_workflow_include (Optional[List[str]]) – Default list of resource labels to load. Can be overridden at runtime using agent_workflow_include= keyword argument in manager.launch(). If None, all resources are loaded.
agent_workflow_run_dir (Optional[str]) – Default directory for Parsl runtime files. Can be overridden at runtime using agent_workflow_run_dir= keyword argument in manager.launch(). If None, a unique directory is auto-generated to prevent collisions when multiple agents run on shared filesystems.
- Returns:
A decorator function that wraps the behavior class in an Agent.
- Return type:
Callable[[Type], Type[Agent]]
Notes
- Runtime Configuration:
When using AgentSystem (which provides ChiltepinManager), pass workflow configuration using keyword arguments to launch(). This separates infrastructure from behavior logic.
Examples
Runtime configuration:
@chiltepin_agent(agent_workflow_include=["default-executor"]) class MyModel: def __init__(self, temperature): # ← No parsl config! Pure domain logic self.temperature = temperature # Launch with runtime configuration model = await manager.launch( MyModel, agent_workflow_config=ursa_config, # ← Workflow config used by the agent's workflow context args=(25.0,), # ← Behavior args only (domain logic) agent_workflow_include=["runtime-executor"], # ← Override decorator default - executors the agent should use to run tasks executor="ursa-service-gc" # ← The executor to use for launching the agent itself (infrastructure) )
Basic agent creation:
from chiltepin.agents import chiltepin_agent, agent_action, agent_loop from chiltepin.tasks import python_task @chiltepin_agent(agent_workflow_include=["ursa-compute"]) # ← Default, can be overridden class MyModel: '''Regular Python class - fully serializable!''' def __init__(self, temperature: float): '''Initialize behavior with domain logic only.''' self.temperature = temperature @agent_action # ← Use @agent_action for sync/task-decorated methods @python_task def run_model(self) -> str: # Import modules inside methods for serialization import random # Can directly access self.temperature! return f"Predicted: {self.temperature + random.uniform(0, 5):.2f} degrees" @agent_action # ← Use @agent_action for async methods too async def get_status(self) -> str: return f"Temperature: {self.temperature:.2f}" @agent_loop # ← Use @agent_loop for background loops async def update_temperature(self, shutdown) -> None: # Import modules inside methods for serialization import asyncio import random while not shutdown.is_set(): await asyncio.sleep(1) self.temperature += random.uniform(-3, 3) def _private_helper(self): # Not decorated with @agent_action, won't be exposed pass # Launch agent using decorator defaults (agent_workflow_include=["ursa-compute"]) model = await manager.launch(MyModel, agent_workflow_config=config, args=(25,)) # Override decorator defaults at runtime model = await manager.launch( MyModel, agent_workflow_config=config, args=(25,), agent_workflow_include=["runtime-executor"], # ← Override decorator's agent_workflow_include ) result = await model.run_model() status = await model.get_status()
Data Module
Data transfer and management for Chiltepin workflows.
This module provides specialized tasks for transferring and deleting data between Globus data transfer endpoints. These tasks integrate seamlessly with Parsl workflows.
Available Tasks
transfer_task(): Transfer files/directories between Globus data endpointsdelete_task(): Delete files/directories from Globus data endpoints
Available Functions
transfer(): Synchronous data transfer using Globusdelete(): Synchronous data deletion using Globus
For comprehensive usage examples and best practices, see the Data Transfer and Management documentation.
Examples
Stage, process, and cleanup data in a workflow:
from chiltepin.data import transfer_task, delete_task
from chiltepin.tasks import python_task
@python_task
def process_data(input_path):
# Process the data
with open(input_path, 'r') as f:
data = f.read()
return len(data)
# Stage data to compute resource
stage = transfer_task(
src_ep="my-laptop",
dst_ep="hpc-scratch",
src_path="/data/input.dat",
dst_path="/scratch/input.dat",
executor=["local"]
)
# Process the staged data (waits for stage via inputs parameter)
result = process_data("/scratch/input.dat", executor=["compute"], inputs=[stage])
# Clean up after processing completes (waits for result via inputs)
cleanup = delete_task(
src_ep="hpc-scratch",
src_path="/scratch/input.dat",
executor=["local"],
inputs=[result]
)
# Get final result
output = result.result()
- chiltepin.data.delete(src_ep: str, src_path: str, timeout: int = 3600, polling_interval: int = 30, client: TransferClient | None = None, recursive: bool = False)[source]
Delete data synchronously with Globus.
This deletes data from a Globus endpoint. This function will not return until the deletion has completed or failed.
- Parameters:
src_ep (str) – Name of the source endpoint for the data to be deleted. Can be a display name or a UUID string.
src_path (str) – Path to the file or directory on the source endpoint that is to be deleted.
timeout (int) – Number of seconds to wait for the deletion to complete.
polling_interval (int) – Number of seconds to wait between checking the status of the deletion
client (TransferClient | None) – Transfer client to use for submitting the deletion. If None, one will be retrieved via the login process. If a login has already been performed, no login flow prompts will be issued. NOTE: Yes, deletion is done using a TransferClient.
recursive (bool) – Whether or not a recursive deletion should be performed
- chiltepin.data.delete_task(src_ep: str, src_path: str, timeout: int = 3600, polling_interval: int = 30, client: TransferClient | None = None, recursive: bool = False)[source]
Delete data asynchronously in a Parsl task
This wraps synchronous Globus data deletion into a Parsl python_app task. Calling this function will immediately return a future. The result of the future will be True if the deletion completed successfully, or False if it did not.
- Parameters:
src_ep (str) – Name of the source endpoint for the data to be deleted. Can be a display name or a UUID string.
src_path (str) – Path to the file or directory on the source endpoint that is to be deleted.
timeout (int) – Number of seconds to wait for the deletion to complete.
polling_interval (int) – Number of seconds to wait between checking the status of the deletion
client (TransferClient | None) – Transfer client to use for submitting the deletion. If None, one will be retrieved via the login process. If a login has already been performed, no login flow prompts will be issued. NOTE: Yes, deletion is done using a TransferClient.
recursive (bool) – Whether or not a recursive deletion should be performed
- chiltepin.data.transfer(src_ep: str, dst_ep: str, src_path: str, dst_path: str, timeout: int = 3600, polling_interval: int = 30, client: TransferClient | None = None, recursive: bool = False)[source]
Transfer data synchronously with Globus
This performs a Globus transfer of data from one Globus transfer endpoint to another. This function will not return until the transfer completes or fails.
- Parameters:
src_ep (str) – Name of the source endpoint for the transfer. Can be a display name or a UUID string.
dst_ep (str) – Name of the destination endpoint for the transfer. Can be a display name or a UUID string.
src_path (str) – Path to the file or directory on the source endpoint that is to be transferred.
dst_path (str) – Path to the file or directory on the destination endpoint where the data is to be transferred.
timeout (int) – Number of seconds to wait for the transfer to complete.
polling_interval (int) – Number of seconds to wait between checking the status of the transfer
client (TransferClient | None) – Transfer client to use for submitting the transfers. If None, one will be retrieved via the login process. If a login has already been performed, no login flow prompts will be issued.
recursive (bool) – Whether or not a recursive transfer should be performed
- chiltepin.data.transfer_task(src_ep: str, dst_ep: str, src_path: str, dst_path: str, timeout: int = 3600, polling_interval: int = 30, client: TransferClient | None = None, recursive: bool = False)[source]
Transfer data asynchronously in a Parsl task
This wraps synchronous Globus data transfer into a Parsl python_app task. Calling this function will immediately return a future. The result of the future will be True if the transfer completed successfully, or False if it did not.
- Parameters:
src_ep (str) – Name of the source endpoint for the transfer. Can be a display name or a UUID string.
dst_ep (str) – Name of the destination endpoint for the transfer. Can be a display name or a UUID string.
src_path (str) – Path to the file or directory on the source endpoint that is to be transferred.
dst_path (str) – Path to the file or directory on the destination endpoint where the data is to be transferred.
timeout (int) – Number of seconds to wait for the transfer to complete.
polling_interval (int) – Number of seconds to wait between checking the status of the transfer
client (TransferClient | None) – Transfer client to use for submitting the transfers. If None, one will be retrieved via the login process. If a login has already been performed, no login flow prompts will be issued.
recursive (bool) – Whether or not a recursive transfer should be performed