Source code for chiltepin.workflow

# SPDX-License-Identifier: Apache-2.0

"""Workflow context managers for Chiltepin.

This module provides context managers that wrap Parsl configuration and lifecycle
management, eliminating the need for users to directly import or interact with Parsl.
"""

import logging
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

import parsl
from globus_compute_sdk import Client

from chiltepin import configure

# Module-level logger for cleanup warnings
_logger = logging.getLogger(__name__)


[docs]class Workflow: """Workflow context manager for Chiltepin. Can be used as a context manager or with explicit start/cleanup calls. This wraps Parsl configuration and lifecycle management, eliminating the need for users to directly import or interact with Parsl. Parameters ---------- config : str, Path, or dict Either a path to a YAML configuration file or a configuration dictionary include : list of str, optional List of resource labels to load. If None, all resources are loaded. Note: The default "local" resource is always available, regardless of this parameter. run_dir : str, optional Directory for Parsl runtime files. If None, uses Parsl's default. client : globus_compute_sdk.Client, optional Globus Compute client for Globus Compute resources. If None, one will be created automatically if needed. log_file : str, optional Path to Parsl log file. If None, no file logging is configured. log_level : int, optional Logging level (e.g., logging.DEBUG). Only used if log_file is provided. Examples -------- As a context manager: >>> from chiltepin import Workflow >>> from chiltepin.tasks import python_task >>> >>> @python_task >>> def my_task(): ... return "Hello!" >>> >>> with Workflow("config.yaml") as dfk: ... result = my_task() ... print(result.result()) With explicit start/cleanup (for use across methods): >>> workflow = Workflow("config.yaml") >>> dfk = workflow.start() >>> # ... do work, potentially in other methods ... >>> workflow.cleanup() """ def __init__( self, config: Union[str, Path, Dict[str, Any]], *, include: Optional[List[str]] = None, run_dir: Optional[str] = None, client: Optional[Client] = None, log_file: Optional[str] = None, log_level: Optional[int] = None, ): """Initialize workflow configuration (does not start workflow yet).""" # Parse config if isinstance(config, (str, Path)): self.config_dict = configure.parse_file(str(config)) else: self.config_dict = config self.include = include self.run_dir = run_dir self.client = client self.log_file = log_file self.log_level = log_level self.dfk = None self.logger_handler = None
[docs] def start(self): """Start the workflow and return DataFlowKernel. Returns ------- DataFlowKernel The Parsl DataFlowKernel instance. Can be used to access workflow state (e.g., dfk.tasks) or for advanced operations. Raises ------ RuntimeError If workflow is already started. """ if self.dfk is not None: raise RuntimeError("Workflow already started. Call cleanup() first.") try: # Set up logging if requested if self.log_file is not None: import logging as log_module level = ( self.log_level if self.log_level is not None else log_module.INFO ) self.logger_handler = parsl.set_file_logger( filename=self.log_file, level=level ) # Load configuration parsl_config = configure.load( self.config_dict, include=self.include, client=self.client, run_dir=self.run_dir, ) # Load Parsl self.dfk = parsl.load(parsl_config) return self.dfk except Exception: # Best-effort cleanup of any partial initialization # This ensures explicit start()/cleanup() usage is as safe as context manager self.cleanup(suppress_exceptions=True) raise
[docs] def cleanup(self, suppress_exceptions=False): """Cleanup the workflow and release resources. This should be called when the workflow is complete. If using as a context manager, this is called automatically. Parameters ---------- suppress_exceptions : bool If True, exceptions during cleanup are logged but not raised. This is used when cleaning up after a user exception to ensure the user's exception is not masked by cleanup exceptions. Notes ----- All cleanup operations are attempted even if some fail. If multiple cleanup operations raise exceptions and suppress_exceptions is False, they are chained together using __cause__, with the last exception being raised. """ cleanup_exception = None # Attempt dfk.cleanup() if self.dfk is not None: try: self.dfk.cleanup() except Exception as e: if suppress_exceptions: _logger.warning( "Exception during dfk.cleanup() (suppressing)", exc_info=True, ) else: _logger.warning("Exception during dfk.cleanup()", exc_info=True) cleanup_exception = e # Always attempt parsl.clear() try: parsl.clear() except Exception as e: if suppress_exceptions: _logger.warning( "Exception during parsl.clear() (suppressing)", exc_info=True, ) else: _logger.warning("Exception during parsl.clear()", exc_info=True) if cleanup_exception is None: cleanup_exception = e else: # Chain this exception to the previous one e.__cause__ = cleanup_exception cleanup_exception = e # Always attempt logger cleanup if self.logger_handler is not None: try: self.logger_handler() except Exception as e: if suppress_exceptions: _logger.warning( "Exception during logger cleanup (suppressing)", exc_info=True, ) else: _logger.warning("Exception during logger cleanup", exc_info=True) if cleanup_exception is None: cleanup_exception = e else: # Chain this exception to the previous one e.__cause__ = cleanup_exception cleanup_exception = e # Only reset state if cleanup fully succeeded # If cleanup failed, preserve state for debugging even if suppressing exceptions if cleanup_exception is None: self.dfk = None self.logger_handler = None # Raise the final exception if any occurred and not suppressing if cleanup_exception is not None and not suppress_exceptions: raise cleanup_exception
def __enter__(self): """Context manager entry - starts the workflow. If start() fails, it performs its own cleanup before re-raising. """ return self.start() def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit - cleans up the workflow.""" # Suppress cleanup exceptions if user exception occurred user_exception = exc_type is not None self.cleanup(suppress_exceptions=user_exception) return False # Don't suppress user exceptions
__all__ = [ "Workflow", ]