Tasks
Chiltepin provides decorators to define workflow tasks that can be executed on configured resources. Tasks are the fundamental units of work in a Chiltepin workflow.
Note
Chiltepin’s task decorators are thin wrappers around Parsl’s @python_app,
@bash_app, and @join_app decorators. Chiltepin adds two key capabilities:
Method decoration: Support for decorating class methods that reference
selfDynamic resource selection: Ability to choose the execution resource at runtime via the
executorparameter
For more information about Parsl’s execution model and features, see the Parsl documentation.
Overview
Chiltepin offers three task decorators:
@python_task: Execute Python functions as workflow tasks
@bash_task: Execute shell commands as workflow tasks
@join_task: Coordinate multiple tasks without blocking workflow execution
When you decorate a function with one of these decorators, it becomes a workflow task
that can be submitted for execution on any configured resource. The function itself
defines what to execute, while the executor parameter at call time specifies
where to execute it.
Python Tasks
The @python_task decorator transforms a Python function into a workflow task.
The function will be serialized and executed on the specified resource.
Basic Usage
from chiltepin.tasks import python_task
@python_task
def hello_world():
return "Hello from a Chiltepin task!"
# Call the task and specify where to run it
future = hello_world(executor=["my-resource"])
result = future.result() # Wait for completion and get result
print(result) # "Hello from a Chiltepin task!"
Tasks with Arguments
Python tasks can accept both positional and keyword arguments:
@python_task
def add_numbers(a, b, multiply_by=1):
return (a + b) * multiply_by
# Use positional arguments
future1 = add_numbers(5, 3, executor=["compute"])
print(future1.result()) # 8
# Use keyword arguments
future2 = add_numbers(5, 3, multiply_by=2, executor=["compute"])
print(future2.result()) # 16
Importing Modules
Since tasks may execute on remote systems, import statements should be inside the function:
@python_task
def get_hostname():
import platform
return platform.node()
@python_task
def process_data(filename):
import pandas as pd
import numpy as np
df = pd.read_csv(filename)
return np.mean(df['values'])
Return Values
Python tasks can return any serializable Python object:
@python_task
def get_list(n):
return list(range(n))
@python_task
def get_dict(key, value):
return {key: value}
@python_task
def get_dataframe():
import pandas as pd
return pd.DataFrame({'a': [1, 2, 3], 'b': [4, 5, 6]})
list_result = get_list(5, executor=["local"]).result()
dict_result = get_dict("temperature", 72.5, executor=["local"]).result()
df_result = get_dataframe(executor=["compute"]).result()
Bash Tasks
The @bash_task decorator transforms a function into a shell command workflow task.
The function must return a string containing the bash commands to execute.
Basic Usage
from chiltepin.tasks import bash_task
@bash_task
def echo_hello():
return "echo 'Hello from bash!'"
# Bash tasks return the exit code (0 = success)
future = echo_hello(executor=["my-resource"])
exit_code = future.result()
print(exit_code) # 0
Dynamic Command Generation
Use function arguments to dynamically construct commands:
@bash_task
def process_file(input_file, output_file):
return f"cat {input_file} | sort | uniq > {output_file}"
@bash_task
def run_simulation(config_file, num_steps):
return f"./my_simulator --config {config_file} --steps {num_steps}"
exit_code = process_file("data.txt", "sorted.txt", executor=["compute"]).result()
Warning
Be careful with shell injection vulnerabilities. Validate and sanitize inputs when constructing shell commands from user-provided data.
Capturing Output
By default, bash tasks return the exit code. To capture stdout or stderr, use the
stdout and stderr parameters that are automatically added to bash tasks:
@bash_task
def get_hostname():
return "hostname"
# Capture stdout to a file
future = get_hostname(
executor=["compute"],
stdout="hostname_output.txt"
)
exit_code = future.result()
# Read the captured output
with open("hostname_output.txt") as f:
hostname = f.read().strip()
print(f"Task ran on: {hostname}")
Note
When running tasks on remote resources (via Globus Compute endpoints), output files are created on the remote system, not on the local host. You’ll need to use shared filesystems, data staging, or file transfer mechanisms to access these files locally.
You can also capture stderr for debugging:
@bash_task
def risky_command():
return "some_command_that_might_fail"
future = risky_command(
executor=["compute"],
stdout="output.txt",
stderr="errors.txt"
)
Multi-line Commands
For complex bash scripts, return multi-line strings:
@bash_task
def setup_and_run(workdir):
return f"""
mkdir -p {workdir}
cd {workdir}
git clone https://github.com/example/repo.git
cd repo
make
./run_tests.sh
"""
MPI Tasks and Task Geometry
For tasks that run MPI (Message Passing Interface) applications, Chiltepin provides
the chiltepin_task_geometry parameter to specify parallel resource requirements. This parameter
tells the scheduler how many nodes, MPI ranks, and ranks per node your task needs.
What is Task Geometry?
Task geometry defines the parallel execution layout for MPI applications:
num_nodes: Number of compute nodes required for the task
num_ranks: Total number of MPI processes (ranks) to launch
ranks_per_node: Number of MPI ranks to place on each node
The relationship between these values is typically:
num_ranks = num_nodes * ranks_per_node
Basic MPI Task Example
Here’s a simple example of running an MPI application:
from chiltepin.tasks import bash_task
@bash_task
def run_mpi_hello():
return "$PARSL_MPI_PREFIX ./mpi_hello.exe"
# Run on 2 nodes with 8 total ranks (4 ranks per node)
exit_code = run_mpi_hello(
executor=["mpi"],
chiltepin_task_geometry={
"num_nodes": 2,
"num_ranks": 8,
"ranks_per_node": 4
}
).result()
Note
The $PARSL_MPI_PREFIX environment variable is automatically set by Parsl’s MPI
launcher and contains the appropriate mpirun or mpiexec command with the
correct number of processes. Always use this variable when launching MPI applications
to ensure the task geometry is properly applied.
Tip
Advanced MPI Environment Variables: Parsl provides additional environment
variables for MPI tasks that can be useful for advanced applications. These include
launcher-specific variables (e.g., $PARSL_SRUN_PREFIX, $PARSL_MPIEXEC_PREFIX)
and task geometry variables ($PARSL_NUM_RANKS, $PARSL_NUM_NODES,
$PARSL_RANKS_PER_NODE). For a complete reference, see Parsl’s
MPI Apps documentation.
Tip
Combining with parsl_resource_specification: The chiltepin_task_geometry parameter
is a convenience wrapper around Parsl’s parsl_resource_specification. For advanced
use cases, you can provide both parameters. When both are present, Chiltepin merges
them with chiltepin_task_geometry taking precedence for overlapping keys. This allows you to
specify MPI geometry via chiltepin_task_geometry while including additional fields in
parsl_resource_specification if needed.
Compile and Run MPI Code
A common pattern is to compile MPI code on a compute resource, then run it on MPI resources with specific geometry:
from chiltepin.tasks import bash_task
@bash_task
def compile_mpi_code(source_dir):
return f"""
cd {source_dir}
$MPIF90 -o simulation.exe simulation.f90
"""
@bash_task
def run_mpi_simulation(work_dir, input_file):
return f"""
cd {work_dir}
$PARSL_MPI_PREFIX ./simulation.exe {input_file}
"""
# Compile on compute resource
compile_result = compile_mpi_code(
"/path/to/source",
executor=["compute"]
).result()
# Run on MPI resource with specified geometry
sim_result = run_mpi_simulation(
"/path/to/source",
"config.in",
executor=["mpi"],
chiltepin_task_geometry={
"num_nodes": 4,
"num_ranks": 16,
"ranks_per_node": 4
},
stdout="simulation.out",
stderr="simulation.err"
).result()
Different Geometries for Different Tasks
You can run multiple MPI tasks with different geometries, and Parsl will schedule them based on resource availability:
@bash_task
def mpi_task(config):
return f"$PARSL_MPI_PREFIX ./app {config}"
# Small task - 1 node
small = mpi_task(
"small.cfg",
executor=["mpi"],
chiltepin_task_geometry={
"num_nodes": 1,
"num_ranks": 8,
"ranks_per_node": 8
}
)
# Large task - 8 nodes
large = mpi_task(
"large.cfg",
executor=["mpi"],
chiltepin_task_geometry={
"num_nodes": 8,
"num_ranks": 64,
"ranks_per_node": 8
}
)
# Both tasks are submitted and can run concurrently if resources allow
small_result = small.result()
large_result = large.result()
Task Geometry with Python Tasks
While less common, you can also use chiltepin_task_geometry with Python tasks that launch
MPI applications:
from chiltepin.tasks import python_task
@python_task
def run_mpi_analysis(data_file):
import subprocess
import os
import shlex
# Get the MPI prefix from environment
mpi_prefix = os.environ.get("PARSL_MPI_PREFIX", "mpiexec -n 1")
# Parse the MPI prefix safely and construct command as list
mpi_command = shlex.split(mpi_prefix)
command = mpi_command + ["python", "mpi_analysis.py", data_file]
# Run MPI application (shell=False prevents injection attacks)
result = subprocess.run(
command,
capture_output=True
)
return result.returncode
exit_code = run_mpi_analysis(
"dataset.h5",
executor=["mpi"],
chiltepin_task_geometry={
"num_nodes": 2,
"num_ranks": 16,
"ranks_per_node": 8
}
).result()
Configuring MPI Resources
To use task geometry effectively, your configuration file must define MPI-capable resources. See the Configuration documentation for details on setting up MPI resources.
Example configuration snippet:
mpi:
mpi: True
provider: "slurm"
max_mpi_apps: 4
cores_per_node: 8
nodes_per_block: 3
partition: "compute"
walltime: "2:00:00"
Best Practices for MPI Tasks
Always use $PARSL_MPI_PREFIX: This ensures your MPI applications receive the correct number of processes specified in
chiltepin_task_geometry.Match geometry to resource: Ensure your
chiltepin_task_geometrydoesn’t exceed the capabilities defined in your resource configuration.Consider oversubscription: For testing on local systems, you may need to allow oversubscription with an appropriate flag, such as Open MPI’s
--oversubscribeor Slurm’s--overcommit.Capture output appropriately: Use
stdoutandstderrparameters to capture MPI application output for debugging.Test scaling: Start with small geometries and scale up to ensure your application runs correctly at different scales.
Join Tasks
The @join_task decorator creates tasks that coordinate other tasks without blocking
the main workflow. Join tasks can launch multiple subtasks and depend on their results.
Basic Usage
Join tasks call other tasks and return futures:
from chiltepin.tasks import python_task, join_task
@python_task
def multiply(x, factor):
return x * factor
@python_task
def add_values(*values):
return sum(values)
@join_task
def process_list(values, factor):
# Launch multiple tasks in parallel
futures = [multiply(v, factor, executor=["compute"]) for v in values]
# Aggregate results with another task
return add_values(*futures, executor=["compute"])
# Process [1, 2, 3] with factor 2: (1*2) + (2*2) + (3*2) = 12
result = process_list([1, 2, 3], 2).result()
print(result) # 12
When to Use Join Tasks
Use join tasks when you need to:
Fan-out operations: Launch many parallel tasks based on input data
Task dependencies: Chain tasks where one depends on another’s output
Dynamic workflows: Create tasks based on runtime conditions
Example - Processing Multiple Files:
@bash_task
def process_file(filepath):
return f"./process.sh {filepath}"
@python_task
def check_all_success(exit_codes):
return all(code == 0 for code in exit_codes)
@join_task
def process_all_files(file_list):
# Process all files in parallel
futures = [process_file(f, executor=["compute"]) for f in file_list]
# Check if all succeeded
return check_all_success(futures, executor=["local"])
files = ["data1.txt", "data2.txt", "data3.txt"]
success = process_all_files(files).result()
Mixing Task Types
Join tasks can coordinate both python and bash tasks:
@bash_task
def compile_code():
return "gcc -o myapp myapp.c"
@bash_task
def run_app(input_file):
return f"./myapp {input_file}"
@python_task
def parse_results(output_file):
with open(output_file) as f:
return float(f.read().strip())
@join_task
def compile_and_run(input_file):
# First compile
compile_future = compile_code(executor=["compute"])
compile_future.result() # Wait for compilation
# Then run
run_future = run_app(input_file, executor=["compute"], stdout="output.txt")
run_future.result() # Wait for execution
# Parse results
return parse_results("output.txt", executor=["local"])
result = compile_and_run("input.dat").result()
Tasks as Class Methods
All task decorators work with both standalone functions and class methods. This enables object-oriented workflow design:
from chiltepin.tasks import python_task, bash_task
class DataProcessor:
def __init__(self, config):
self.config = config
@python_task
def load_data(self, filename):
import pandas as pd
# Can access self and instance variables
return pd.read_csv(filename, **self.config)
@python_task
def transform_data(self, df):
# Use instance configuration
if self.config.get('normalize'):
return (df - df.mean()) / df.std()
return df
@bash_task
def export_data(self, output_file):
# self is available in method tasks
format_type = self.config.get('export_format', 'csv')
return f"convert_data --format {format_type} -o {output_file}"
# Create instance and use tasks
processor = DataProcessor({'normalize': True, 'export_format': 'json'})
data = processor.load_data("input.csv", executor=["compute"]).result()
transformed = processor.transform_data(data, executor=["compute"]).result()
exit_code = processor.export_data("output.json", executor=["compute"]).result()
Warning
Pickling Limitation: When decorating class methods, the entire class instance
(self) must be picklable (serializable) because it gets sent to remote workers.
Classes that contain unpicklable objects—such as network connections, file handles,
database connections, remote proxies, or framework-specific objects—cannot be used
with task decorators.
Workaround 1 - Standalone Functions: Extract the task logic into a standalone function and pass only picklable data as arguments:
@python_task
def process_data(config, text):
# Task logic using only picklable data
return f"Processed {text} with {config}"
class MyService:
def __init__(self):
self.config = {"param": "value"} # Picklable
self.connection = NetworkConnection() # NOT picklable
def process(self, text):
# Pass only picklable data to the task
return process_data(self.config, text)
Workaround 2 - Helper Class: Create a separate helper class with only picklable state to hold the task methods, while keeping unpicklable objects in the main class:
class TaskBehavior:
"""Helper class with only picklable state for workflow tasks."""
@python_task
def process_data(self, config, text):
# Task logic - self can be empty or contain only picklable data
return f"Processed {text} with {config}"
@python_task
def analyze_result(self, data):
# Another task method
return len(data)
class MyService:
def __init__(self):
self.config = {"param": "value"} # Picklable
self.connection = NetworkConnection() # NOT picklable
# Create helper instance for tasks
self.tasks = TaskBehavior()
def process(self, text):
# Call task through helper instance
future = self.tasks.process_data(self.config, text, executor=["compute"])
return future.result()
def analyze(self, data):
return self.tasks.analyze_result(data, executor=["compute"]).result()
This pattern is especially useful when integrating with frameworks that have unpicklable objects, as it cleanly separates the workflow task definitions from the framework-specific state.
Warning
Mutable Object State: When using class methods as tasks, be aware that mutable
object state can lead to non-deterministic behavior in distributed systems. Each task
captures the object state at the time it’s submitted. If the object’s state is modified
between task submissions (e.g., updating self.config), different tasks may see
different states, leading to unexpected results. For best reliability, use immutable
configuration or pass state explicitly as task arguments rather than relying on
mutable instance variables.
Specifying Resources
The executor parameter determines where a task runs. This parameter refers to
resource names defined in your configuration file.
Note
The parameter is called executor due to Parsl’s API, but it specifies which
resource to use, not an “executor” in the traditional programming sense.
Single Resource
Specify a single resource by name:
@python_task
def my_task():
return "result"
# Run on the "compute" resource from your config
future = my_task(executor=["compute"])
Multiple Resources
Provide a list of resource names to allow Parsl to choose based on availability:
# Can run on either resource
future = my_task(executor=["compute", "backup-compute"])
Default Executor
If you omit the executor parameter, the task can run on any configured resource:
# Can run on any available resource
future = my_task()
Tip
For production workflows, explicitly specify resources to ensure tasks run where intended (e.g., GPU tasks on GPU resources, MPI tasks on MPI resources).
Futures and Results
Task calls return AppFuture objects, which represent asynchronous computation.
Getting Results
Call .result() to wait for task completion and retrieve the result:
@python_task
def compute_value():
import time
time.sleep(2)
return 42
future = compute_value(executor=["compute"])
print("Task submitted, doing other work...")
# This blocks until the task completes
result = future.result()
print(f"Result: {result}")
Checking Status
Check if a task is done without blocking:
future = compute_value(executor=["compute"])
if future.done():
print("Task completed!")
print(future.result())
else:
print("Task still running...")
Multiple Futures
Wait for multiple tasks efficiently:
# Launch multiple tasks
futures = [compute_value(executor=["compute"]) for _ in range(10)]
# Wait for all to complete
results = [f.result() for f in futures]
print(f"All done: {results}")
Exception Handling
Exceptions raised in tasks are re-raised when calling .result():
@python_task
def failing_task():
raise ValueError("Something went wrong!")
future = failing_task(executor=["compute"])
try:
result = future.result()
except ValueError as e:
print(f"Task failed: {e}")
File and Data Handling
Working with Files
Tasks can read and write files, but file paths must be accessible from the resource where the task runs:
@python_task
def process_file(input_path, output_path):
with open(input_path) as f:
data = f.read()
processed = data.upper()
with open(output_path, 'w') as f:
f.write(processed)
return output_path
Warning
When running on remote resources via Globus Compute, ensure files are accessible on the remote system. You may need to stage files or use shared filesystems.
Data Transfer Between Endpoints
For moving data between Globus Transfer endpoints, Chiltepin provides specialized data transfer and deletion tasks that can be incorporated into your workflows:
from chiltepin.data import transfer_task, delete_task
from chiltepin.tasks import python_task
@python_task
def process_file(transfer_complete, input_path):
# transfer_complete is a boolean we can check or ignore
# The important part is passing it creates a dependency
with open(input_path) as f:
data = f.read()
return len(data)
# Transfer data between Globus Transfer endpoints
transfer = transfer_task(
src_ep="my-source-endpoint",
dst_ep="my-dest-endpoint",
src_path="/data/input.dat",
dst_path="/scratch/input.dat",
executor=["local"]
)
# Process the transferred data (waits for transfer by passing its future)
result = process_file(transfer, "/scratch/input.dat", executor=["compute"])
# Clean up after processing
cleanup = delete_task(
src_ep="my-dest-endpoint",
src_path="/scratch/input.dat",
executor=["local"],
inputs=[result] # Waits for processing to complete
)
cleanup.result()
These tasks operate on Globus Transfer endpoints (which are different from Globus Compute endpoints used for execution). See Data Transfer and Management for comprehensive documentation on data transfer and deletion tasks.
Passing Data Between Tasks
Pass data directly through futures:
@python_task
def generate_data(n):
return list(range(n))
@python_task
def sum_data(data):
return sum(data)
# Data flows through futures
data_future = generate_data(100, executor=["compute"])
sum_future = sum_data(data_future, executor=["compute"])
result = sum_future.result()
For large data, consider files or data staging strategies.
Advanced Topics
Environment Variables
Access environment variables in tasks:
@python_task
def get_user():
import os
return os.environ.get('USER', 'unknown')
@bash_task
def get_user_bash():
return "echo ${USER:-unknown}"
Task Dependencies
Create task dependencies to ensure tasks execute in the correct order. There are two primary methods for establishing dependencies between tasks.
Passing Futures as Arguments
The most common approach is to pass a future from one task as an argument to another:
@python_task
def step1():
return "result1"
@python_task
def step2(input_data):
return f"processed_{input_data}"
@python_task
def step3(input_data):
return f"final_{input_data}"
# Chain tasks - data flows through futures
future1 = step1(executor=["compute"])
future2 = step2(future1, executor=["compute"]) # Waits for future1
future3 = step3(future2, executor=["compute"]) # Waits for future2
final_result = future3.result()
Using the inputs Parameter
For dependencies where you don’t need to pass data between tasks, use the inputs
parameter. This is automatically supported by all Chiltepin task decorators
(via Parsl’s underlying implementation):
from chiltepin.data import delete_task, transfer_task
from chiltepin.tasks import python_task
# Stage data to compute resource
stage = transfer_task(
src_ep="laptop",
dst_ep="hpc-scratch",
src_path="/data/input.dat",
dst_path="/scratch/input.dat",
executor=["local"]
)
# Process the data - waits for transfer without passing its result
@python_task
def process_data(filepath):
with open(filepath) as f:
return len(f.read())
result = process_data("/scratch/input.dat", executor=["compute"], inputs=[stage])
# Clean up - waits for processing to complete
cleanup = delete_task(
src_ep="hpc-scratch",
src_path="/scratch/input.dat",
executor=["local"],
inputs=[result]
)
The inputs parameter accepts a list of futures that must complete before the task
starts. This is particularly useful for:
Ensuring files are transferred before processing begins
Coordinating cleanup operations after processing completes
Creating dependencies when you don’t need to pass data between tasks
Coordinating multiple independent prerequisites
Multiple Dependencies
You can combine both approaches and specify multiple dependencies:
@python_task
def task_a():
return "data_a"
@python_task
def task_b():
return "data_b"
@python_task
def task_c():
# Just needs to wait, doesn't use the result
pass
@python_task
def combine(data1, data2):
return f"{data1}_{data2}"
a = task_a(executor=["compute"])
b = task_b(executor=["compute"])
c = task_c(executor=["compute"])
# Combine waits for a and b (via arguments) and c (via inputs)
result = combine(a, b, executor=["compute"], inputs=[c])
Tip
Avoid premature .result() calls: In this example, notice that .result() is only
called once at the very end. By passing futures directly as arguments instead of calling
.result() immediately, you allow Parsl to manage task dependencies automatically and
schedule tasks as soon as their dependencies complete. This maximizes parallelism.
Bad practice (blocks unnecessarily):
result1 = step1(executor=["compute"]).result() # Blocks here
result2 = step2(result1, executor=["compute"]).result() # Blocks here
result3 = step3(result2, executor=["compute"]).result() # Blocks here
Good practice (maximizes parallelism):
future1 = step1(executor=["compute"])
future2 = step2(future1, executor=["compute"]) # Scheduled, doesn't block
future3 = step3(future2, executor=["compute"]) # Scheduled, doesn't block
result = future3.result() # Only block when you need the final result
Timeout Handling
Handle long-running tasks with timeouts:
from concurrent.futures import TimeoutError
@python_task
def long_task():
import time
time.sleep(100)
return "done"
future = long_task(executor=["compute"])
try:
result = future.result(timeout=10) # Wait max 10 seconds
except TimeoutError:
print("Task timed out")
Retry Logic
Implement retry logic for unreliable tasks:
def run_with_retry(task_func, *args, max_retries=3, **kwargs):
for attempt in range(max_retries):
try:
future = task_func(*args, **kwargs)
return future.result()
except Exception as e:
if attempt == max_retries - 1:
raise
print(f"Attempt {attempt + 1} failed: {e}, retrying...")
Best Practices
Keep tasks pure: Avoid side effects when possible. Tasks should transform inputs to outputs predictably.
Import inside tasks: Always import modules inside task functions, not at the module level, to ensure imports work on remote systems.
Specify resources explicitly: Use the
executorparameter to control where tasks run, especially for resource-specific requirements (GPU, MPI, etc.).Handle errors gracefully: Wrap
.result()calls in try-except blocks for production workflows.Use join tasks for coordination: Don’t block the main thread waiting for results. Let join tasks coordinate dependencies.
Validate bash commands: Sanitize inputs when constructing bash commands to avoid shell injection vulnerabilities.
Use descriptive task names: Function names should clearly indicate what the task does for easier debugging.
Common Patterns
Map-Reduce
@python_task
def map_task(item):
return item ** 2
@python_task
def reduce_task(results):
return sum(results)
@join_task
def map_reduce(items):
# Map phase
futures = [map_task(item, executor=["compute"]) for item in items]
# Reduce phase
return reduce_task(futures, executor=["compute"])
result = map_reduce([1, 2, 3, 4, 5]).result() # 55
Pipeline Processing
@python_task
def stage1(data):
return data * 2
@python_task
def stage2(data):
return data + 10
@python_task
def stage3(data):
return data ** 2
# Create pipeline
data1 = stage1(5, executor=["compute"])
data2 = stage2(data1, executor=["compute"])
result = stage3(data2, executor=["compute"]).result() # ((5*2)+10)^2 = 400
Parameter Sweep
@python_task
def run_experiment(param1, param2):
# Run simulation with parameters
result = param1 * param2
return {"params": (param1, param2), "result": result}
# Sweep over parameter space
futures = []
for p1 in [1, 2, 3]:
for p2 in [10, 20, 30]:
future = run_experiment(p1, p2, executor=["compute"])
futures.append(future)
# Collect all results
results = [f.result() for f in futures]
See Also
Quick Start - Quick introduction to tasks in a complete workflow
Configuration - Configuring resources where tasks execute
API Reference - Full API reference including task decorator signatures