# SPDX-License-Identifier: Apache-2.0
"""Task decorators for Chiltepin workflows.
This module provides decorators for defining workflow tasks that can be executed
on configured resources. Tasks are the fundamental units of work in Chiltepin workflows.
Available Decorators
--------------------
- :func:`python_task`: Execute Python functions as workflow tasks
- :func:`bash_task`: Execute shell commands as workflow tasks
- :func:`join_task`: Coordinate multiple tasks without blocking workflow execution
For comprehensive usage examples and best practices, see the :doc:`tasks` documentation.
Examples
--------
Define a simple Python task::
from chiltepin.tasks import python_task
@python_task
def add_numbers(a, b):
return a + b
# Execute on a specific resource
result = add_numbers(5, 3, executor=["compute"]).result()
Define a bash task::
from chiltepin.tasks import bash_task
@bash_task
def list_files(directory):
return f"ls -la {directory}"
# Returns exit code (0 = success)
exit_code = list_files("/tmp", executor=["compute"]).result()
Define an MPI task using task geometry::
@bash_task
def run_mpi_simulation(input_file):
return f"$PARSL_MPI_PREFIX ./simulation {input_file}"
# Specify parallel resource requirements
exit_code = run_mpi_simulation(
"config.in",
executor=["mpi"],
chiltepin_task_geometry={
"num_nodes": 4,
"num_ranks": 16,
"ranks_per_node": 4
}
).result()
"""
from functools import wraps
from inspect import Parameter, signature
from typing import Callable, Optional
from parsl.app.app import bash_app, join_app, python_app
def _create_filtered_wrapper(function: Callable) -> Callable:
"""Create a wrapper that filters kwargs to only pass what the function accepts.
This helper function creates an intermediate wrapper for Parsl app decorators.
The wrapper accepts any arguments that Parsl injects (stdout, stderr, etc.)
but only forwards the ones that the user's function signature expects.
Parameters
----------
function: Callable
The user's function to wrap
Returns
-------
Callable
A wrapper function that filters kwargs based on the function's signature
"""
sig = signature(function)
func_params = sig.parameters
# Check if the function accepts **kwargs (VAR_KEYWORD)
has_var_keyword = any(
param.kind == Parameter.VAR_KEYWORD for param in func_params.values()
)
@wraps(function)
def wrapper(*args, **kwargs):
# If function has **kwargs, pass all kwargs through
# Otherwise, filter to only include parameters the function explicitly accepts
if has_var_keyword:
return function(*args, **kwargs)
else:
filtered_kwargs = {k: v for k, v in kwargs.items() if k in func_params}
return function(*args, **filtered_kwargs)
return wrapper
def _merge_chiltepin_task_geometry(
chiltepin_task_geometry: Optional[dict], kwargs: dict
) -> None:
"""Merge chiltepin_task_geometry into parsl_resource_specification in kwargs.
This helper function handles the merging of chiltepin_task_geometry into the
parsl_resource_specification parameter. If both are provided, they are merged
with chiltepin_task_geometry taking precedence for overlapping keys. If
parsl_resource_specification is None, it is treated as an empty dict.
Parameters
----------
chiltepin_task_geometry: dict or None
Task geometry specification to merge
kwargs: dict
Keyword arguments dictionary to modify in place
Returns
-------
None
Modifies kwargs in place
Raises
------
TypeError
If chiltepin_task_geometry is not None and not a dict
"""
if chiltepin_task_geometry is not None:
# Validate that chiltepin_task_geometry is a dict
if not isinstance(chiltepin_task_geometry, dict):
raise TypeError(
f"chiltepin_task_geometry must be a dict, got {type(chiltepin_task_geometry).__name__}"
)
# Make a defensive copy to prevent mutations
geometry_copy = dict(chiltepin_task_geometry)
if "parsl_resource_specification" in kwargs:
existing_spec = kwargs["parsl_resource_specification"]
# Treat None as an empty dict
if existing_spec is None:
existing_spec = {}
# Merge: start with existing spec, update with geometry
merged_spec = dict(existing_spec)
merged_spec.update(geometry_copy)
kwargs["parsl_resource_specification"] = merged_spec
else:
kwargs["parsl_resource_specification"] = geometry_copy
[docs]class MethodWrapper:
"""Wrapper that preserves method behavior for decorated functions.
This descriptor ensures that when a decorated function is accessed as a
method, it properly creates a bound method with the instance.
"""
def __init__(self, func, wrapper_func):
self.func = func
self.wrapper_func = wrapper_func
# Copy over metadata
wraps(func)(self)
def __get__(self, obj, objtype=None):
"""Support instance methods."""
if obj is None:
# Accessed on class, return self
return self
# Return a bound version of the wrapper
from functools import partial
return partial(self.wrapper_func, obj)
def __call__(self, *args, **kwargs):
"""Support standalone function calls."""
return self.wrapper_func(*args, **kwargs)
[docs]def python_task(function: Callable) -> Callable:
"""Decorator function for making Chiltepin python tasks.
The decorator transforms the function into a Parsl python_app but adds an executor
argument such that the executor for the function can be chosen dynamically at runtime.
Parameters
----------
function: Callable
The function to be decorated to yield a Python workflow task. This function can be a
stand-alone function or a class method. If it is a class method, it can make use of
`self` to access object state.
Other Parameters
----------------
The decorated function includes the following additional parameters at call time:
executor: str or list of str, default="all"
Resource name(s) where the task should execute. Can be a single resource name or
a list of resource names. Defaults to "all" which allows execution on any configured
resource.
chiltepin_task_geometry: dict, optional
Specification of parallel task geometry for MPI applications. This parameter is
mapped to Parsl's ``parsl_resource_specification``. The dictionary should contain:
- **num_nodes** (int): Number of nodes required for the task
- **num_ranks** (int): Total number of MPI ranks
- **ranks_per_node** (int): Number of MPI ranks per node
Example::
chiltepin_task_geometry={
"num_nodes": 4,
"num_ranks": 16,
"ranks_per_node": 4
}
inputs: list of AppFuture, optional
List of futures that must complete before this task starts. Used to create
task dependencies without passing data between tasks. See Parsl's documentation
for details.
.. note::
All keyword arguments supported by Parsl's ``python_app`` decorator (such as
``outputs``, ``walltime``, etc.) are also accepted and passed through
to the underlying Parsl app.
.. warning::
**Class Method Limitation**: When decorating class methods, the entire class instance
(``self``) must be picklable because it gets serialized and sent to remote workers.
Classes that contain unpicklable objects (network connections, file handles, database
connections, remote proxies, etc.) cannot be used with this decorator.
**Workaround 1 - Standalone Functions**: Extract the task logic into a standalone
function and pass only picklable data as arguments::
@python_task
def process_data(config, text):
return f"Processed {text}"
class MyClass:
def __init__(self):
self.config = {"param": "value"} # Picklable
self.connection = NetworkConnection() # NOT picklable
def my_method(self, text):
return process_data(self.config, text)
**Workaround 2 - Helper Class**: Create a helper class with only picklable state
to hold task methods::
class TaskBehavior:
@python_task
def process(self, config, text):
return f"Processed {text}"
class MyClass:
def __init__(self):
self.config = {"param": "value"}
self.connection = NetworkConnection() # NOT picklable
self.tasks = TaskBehavior()
def my_method(self, text):
return self.tasks.process(self.config, text, executor=["compute"])
Returns
-------
Callable
The decorated function that can be called as a workflow task.
Examples
--------
Basic usage::
@python_task
def compute(x):
return x ** 2
result = compute(5, executor=["compute"]).result()
MPI task with task geometry::
@python_task
def run_mpi_code(params):
# MPI code execution
return "result"
future = run_mpi_code(
params,
executor=["mpi"],
chiltepin_task_geometry={"num_nodes": 2, "num_ranks": 8, "ranks_per_node": 4}
)
"""
def function_wrapper(
*args,
executor="all",
chiltepin_task_geometry=None,
**kwargs,
):
# Map chiltepin_task_geometry to Parsl's parsl_resource_specification
_merge_chiltepin_task_geometry(chiltepin_task_geometry, kwargs)
return python_app(_create_filtered_wrapper(function), executors=executor)(
*args, **kwargs
)
return MethodWrapper(function, function_wrapper)
[docs]def bash_task(function: Callable) -> Callable:
"""Decorator function for making Chiltepin bash tasks.
The decorator transforms the function into a Parsl bash_app but adds an executor
argument such that the executor for the function can be chosen dynamically at runtime.
Parameters
----------
function: Callable
The function to be decorated to yield a Bash workflow task. This function can be a
stand-alone function or a class method. If it is a class method, it can make use of
`self` to access object state. The function must return a string that contains a
series of bash commands to be executed.
Other Parameters
----------------
The decorated function includes the following additional parameters at call time:
executor: str or list of str, default="all"
Resource name(s) where the task should execute. Can be a single resource name or
a list of resource names. Defaults to "all" which allows execution on any configured
resource.
chiltepin_task_geometry: dict, optional
Specification of parallel task geometry for MPI applications. This parameter is
mapped to Parsl's ``parsl_resource_specification``. The dictionary should contain:
- **num_nodes** (int): Number of nodes required for the task
- **num_ranks** (int): Total number of MPI ranks
- **ranks_per_node** (int): Number of MPI ranks per node
Example::
chiltepin_task_geometry={
"num_nodes": 4,
"num_ranks": 16,
"ranks_per_node": 4
}
stdout: str or tuple, optional
File path for capturing standard output. Can be a string path or a tuple of
(path, mode) where mode is typically 'w' for write or 'a' for append.
stderr: str or tuple, optional
File path for capturing standard error. Can be a string path or a tuple of
(path, mode) where mode is typically 'w' for write or 'a' for append.
inputs: list of AppFuture, optional
List of futures that must complete before this task starts. Used to create
task dependencies without passing data between tasks. See Parsl's documentation
for details.
.. note::
All keyword arguments supported by Parsl's ``bash_app`` decorator (such as
``outputs``, ``walltime``, etc.) are also accepted and passed through
to the underlying Parsl app.
.. warning::
**Class Method Limitation**: When decorating class methods, the entire class instance
(``self``) must be picklable because it gets serialized and sent to remote workers.
Classes that contain unpicklable objects (network connections, file handles, database
connections, remote proxies, etc.) cannot be used with this decorator.
See the ``python_task`` decorator documentation for examples of recommended workarounds.
Returns
-------
Callable
The decorated function that can be called as a workflow task. Returns the exit
code of the bash command (0 indicates success).
Examples
--------
Basic usage::
@bash_task
def compile_code():
return "gcc -o program program.c"
exit_code = compile_code(executor=["compute"]).result()
MPI task with task geometry::
@bash_task
def run_mpi_simulation(input_file):
return f"$PARSL_MPI_PREFIX ./simulation {input_file}"
exit_code = run_mpi_simulation(
"config.in",
executor=["mpi"],
chiltepin_task_geometry={"num_nodes": 4, "num_ranks": 16, "ranks_per_node": 4},
stdout="output.log"
).result()
"""
def function_wrapper(
*args,
executor="all",
chiltepin_task_geometry=None,
**kwargs,
):
# Map chiltepin_task_geometry to Parsl's parsl_resource_specification
_merge_chiltepin_task_geometry(chiltepin_task_geometry, kwargs)
return bash_app(_create_filtered_wrapper(function), executors=executor)(
*args, **kwargs
)
return MethodWrapper(function, function_wrapper)
[docs]def join_task(function: Callable) -> Callable:
"""Decorator function for making Chiltepin join tasks.
The decorator transforms the function into a Parsl join_app. A parsl @join_app decorator
accomplishes the same thing. This decorator is added to provide API consistency so that
users can use @join_task rather than @join_app along with @python_task and @bash_task.
Parameters
----------
function: Callable
The function to be decorated to yield a join workflow task. This function can be a
stand-alone function or a class method. If it is a class method, it can make use of
`self` to access object state. The function is expected to call multiple python or
bash tasks and return a Future that encapsulates the result of those tasks.
.. warning::
**Class Method Limitation**: When decorating class methods, the entire class instance
(``self``) must be picklable because it gets serialized and sent to remote workers.
Classes that contain unpicklable objects (network connections, file handles, database
connections, remote proxies, etc.) cannot be used with this decorator.
See the ``python_task`` decorator documentation for examples of recommended workarounds.
Returns
-------
Callable
"""
def function_wrapper(
*args,
**kwargs,
):
return join_app(_create_filtered_wrapper(function))(*args, **kwargs)
return MethodWrapper(function, function_wrapper)