Source code for chiltepin.workflow

# SPDX-License-Identifier: Apache-2.0

"""Workflow context managers for Chiltepin.

This module provides context managers that wrap Parsl configuration and lifecycle
management, eliminating the need for users to directly import or interact with Parsl.
"""

import logging
import sys
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

import parsl
from globus_compute_sdk import Client

from chiltepin import configure

# Module-level logger for cleanup warnings
_logger = logging.getLogger(__name__)


[docs]@contextmanager def run_workflow( config: Union[str, Path, Dict[str, Any]], *, include: Optional[List[str]] = None, run_dir: Optional[str] = None, client: Optional[Client] = None, log_file: Optional[str] = None, log_level: Optional[int] = None, ): """Context manager for Chiltepin workflows. This wraps Parsl configuration and lifecycle management, eliminating the need for users to directly import or interact with Parsl. Parameters ---------- config : str, Path, or dict Either a path to a YAML configuration file or a configuration dictionary include : list of str, optional List of resource labels to load. If None, all resources are loaded. Note: The default "local" resource is always available, regardless of this parameter. run_dir : str, optional Directory for Parsl runtime files. If None, uses Parsl's default. client : globus_compute_sdk.Client, optional Globus Compute client for Globus Compute resources. If None, one will be created automatically if needed. log_file : str, optional Path to Parsl log file. If None, no file logging is configured. log_level : int, optional Logging level (e.g., logging.DEBUG). Only used if log_file is provided. Yields ------ None The context manager doesn't yield anything. Tasks can be submitted within the context. Examples -------- From a configuration file: >>> from chiltepin import run_workflow >>> from chiltepin.tasks import python_task >>> >>> @python_task >>> def my_task(): ... return "Hello from workflow!" >>> >>> with run_workflow("config.yaml"): ... result = my_task(executor=["compute"]) ... print(result.result()) From a configuration dictionary: >>> config = {"laptop": {"provider": "localhost", "cores_per_node": 4}} >>> with run_workflow(config): ... result = my_task(executor=["laptop"]) ... print(result.result()) With logging and selective resources: >>> import logging >>> with run_workflow("config.yaml", include=["compute"], ... log_file="workflow.log", log_level=logging.DEBUG): ... # "compute" resource from config is available ... result = my_task(executor=["compute"]) ... # "local" resource is also always available ... local_result = my_task(executor=["local"]) """ # Parse config if it's a file path if isinstance(config, (str, Path)): config_dict = configure.parse_file(str(config)) else: config_dict = config # Set up logging if requested logger_handler = None if log_file is not None: import logging as log_module level = log_level if log_level is not None else log_module.INFO logger_handler = parsl.set_file_logger(filename=log_file, level=level) # Initialize dfk to None before attempting to load dfk = None try: # Load configuration parsl_config = configure.load( config_dict, include=include, client=client, run_dir=run_dir, ) # Load Parsl with the configuration dfk = parsl.load(parsl_config) yield finally: # Check if we're cleaning up during exception handling # If so, don't mask the user's exception with cleanup exceptions user_exception = sys.exc_info()[0] is not None cleanup_exception = None cleanup_tb = None # Preserve original traceback # Attempt all cleanup operations, catching exceptions if dfk is not None: try: dfk.cleanup() except Exception as e: if user_exception: _logger.warning( "Exception during dfk.cleanup() while handling user exception", exc_info=True, ) else: cleanup_exception = e cleanup_tb = sys.exc_info()[2] # Always call parsl.clear() try: parsl.clear() except Exception as e: if user_exception: _logger.warning( "Exception during parsl.clear() while handling user exception", exc_info=True, ) elif cleanup_exception is None: cleanup_exception = e cleanup_tb = sys.exc_info()[2] else: e.__cause__ = cleanup_exception cleanup_exception = e cleanup_tb = sys.exc_info()[2] # Cleanup logger handler if logger_handler is not None: try: logger_handler() except Exception as e: if user_exception: _logger.warning( "Exception during logger cleanup while handling user exception", exc_info=True, ) elif cleanup_exception is None: cleanup_exception = e cleanup_tb = sys.exc_info()[2] else: e.__cause__ = cleanup_exception cleanup_exception = e cleanup_tb = sys.exc_info()[2] # Only raise cleanup exceptions if there wasn't a user exception # Preserve the original traceback so the error location is clear if cleanup_exception is not None and not user_exception: raise cleanup_exception.with_traceback(cleanup_tb)
# Convenience aliases for clarity
[docs]@contextmanager def run_workflow_from_file( config_file: Union[str, Path], **kwargs, ): """Context manager for workflows using a YAML configuration file. This is an alias for run_workflow() that makes it explicit that a file path is expected. Parameters ---------- config_file : str or Path Path to YAML configuration file **kwargs Additional arguments passed to run_workflow() Examples -------- >>> from chiltepin import run_workflow_from_file >>> from chiltepin.tasks import python_task >>> >>> @python_task >>> def my_task(): ... return 42 >>> >>> with run_workflow_from_file("my_config.yaml", include=["compute"]): ... result = my_task(executor=["compute"]) ... # "local" resource is also always available ... local_result = my_task(executor=["local"]) ... print(result.result()) """ with run_workflow(config_file, **kwargs): yield
[docs]@contextmanager def run_workflow_from_dict( config_dict: Dict[str, Any], **kwargs, ): """Context manager for workflows using a configuration dictionary. This is an alias for run_workflow() that makes it explicit that a dict is expected. Parameters ---------- config_dict : dict Configuration dictionary **kwargs Additional arguments passed to run_workflow() Examples -------- >>> from chiltepin import run_workflow_from_dict >>> from chiltepin.tasks import python_task >>> >>> @python_task >>> def my_task(): ... return 42 >>> >>> config = {"laptop": {"provider": "localhost"}} >>> with run_workflow_from_dict(config): ... result = my_task(executor=["laptop"]) ... # "local" resource is also always available ... local_result = my_task(executor=["local"]) ... print(result.result()) """ with run_workflow(config_dict, **kwargs): yield
__all__ = [ "run_workflow", "run_workflow_from_file", "run_workflow_from_dict", ]