# SPDX-License-Identifier: Apache-2.0
import os
import pathlib
import platform
import shutil
import subprocess
import sys
import tempfile
import time
from pathlib import Path
from typing import Dict, Optional, Tuple, Union
import psutil
import yaml
# Try to import globus-compute-endpoint (Linux-only on conda-forge)
try:
from globus_compute_endpoint.endpoint.config.utils import get_config
from globus_compute_endpoint.endpoint.endpoint import Endpoint
ENDPOINT_MANAGEMENT_AVAILABLE = True
_ENDPOINT_IMPORT_ERROR = None
except ImportError as e:
ENDPOINT_MANAGEMENT_AVAILABLE = False
_ENDPOINT_IMPORT_ERROR = e
get_config = None
Endpoint = None
from academy.exchange.cloud.login import get_globus_app as get_globus_academy_app
from academy.exchange.cloud.scopes import AcademyExchangeScopes
from globus_compute_sdk import Client
from globus_compute_sdk.sdk.auth.auth_client import ComputeAuthClient
from globus_compute_sdk.sdk.auth.globus_app import (
get_globus_app as get_globus_compute_app,
)
from globus_compute_sdk.sdk.web_client import WebClient
from globus_sdk import ClientApp, GlobusApp, TransferClient, UserApp
from globus_sdk.gare import GlobusAuthorizationParameters
endpoint_template = """# This is the default user-endpoint-process (UEP) template provided with
# newly-configured endpoints. Endpoints generate a UEP-specific configuration
# by processing this YAML file as a Jinja template against SDK-provided (user)
# variables -- please modify this template to suit your site's requirements.
#
# As an optional security and user-debugging aid, consider also specifying a
# JSON schema for the user-provided variables. If `user_config_schema.json`
# exists within the same directory, then before starting the UEP, the MEP will
# validate the variables against the schema before rendering. This provides
# an administrative peace of mind that users cannot specify invalid arguments.
# From a usability standpoint, however, it also can make invalid values
# prominently visible to users.
#
# For more information, please see the `user_endpoint_config` in Globus Compute
# SDK's Executor.
#
# Some common options site-administrators may want to set:
# - address
# - provider (e.g., SlurmProvider, TorqueProvider, CobaltProvider, etc.)
# - account
# - scheduler_options
# - walltime
# - worker_init
#
# There are a number of example configurations available in the documentation:
# https://globus-compute.readthedocs.io/en/stable/endpoints.html#example-configurations
debug: True
endpoint_setup: {{ endpoint_setup|default() }}
engine:
{% if mpi %}
type: GlobusMPIEngine
max_workers_per_block: {{ max_mpi_apps|default(1) }}
{% if provider == '"slurm"' %}
{% set default_mpi_launcher = "srun" %}
{% else %}
{% set default_mpi_launcher = "mpiexec" %}
{% endif %}
mpi_launcher: {{ mpi_launcher|default(default_mpi_launcher) }}
{% else %}
type: GlobusComputeEngine
{% endif %}
run_in_sandbox: True
provider:
{% if provider == '"slurm"' %}
type: SlurmProvider
{% elif provider == '"pbspro"' %}
type: PBSProProvider
{% else %}
type: LocalProvider
{% endif %}
launcher:
{% if mpi %}
type: SimpleLauncher
{% else %}
{% if provider == '"slurm"' %}
type: SrunLauncher
{% elif provider == '"pbspro"' %}
type: MpiExecLauncher
{% else %}
type: SingleNodeLauncher
{% endif %}
{% endif %}
init_blocks: {{ init_blocks|default(0) }}
min_blocks: {{ min_blocks|default(0) }}
max_blocks: {{ max_blocks|default(1) }}
worker_init: {{ worker_init|default() }}
{% if provider != '"localhost"' %}
{% if not mpi %}
{% if provider == '"slurm"' %}
cores_per_node: {{ cores_per_node|default(1) }}
{% elif provider == '"pbspro"' %}
cpus_per_node: {{ cores_per_node|default(1) }}
{% endif %}
{% endif %}
nodes_per_block: {{ nodes_per_block|default(1) }}
{% if provider == '"slurm"' %}
exclusive: {{ exclusive|default("True") }}
partition: {{ partition|default() }}
qos: {{ queue|default() }}
{% elif provider == '"pbspro"' %}
queue: {{ queue|default() }}
{% endif %}
account: {{ account|default() }}
walltime: {{ walltime|default("00:10:00") }}
{% endif %}
# Endpoints will be restarted when a user submits new tasks to the
# web-services, so eagerly shut down if endpoint is idle. At 30s/hb (default
# value), 10 heartbeats is 300s.
idle_heartbeats_soft: 120
# If endpoint is *apparently* idle (e.g., outstanding tasks, but no movement)
# for this many heartbeats, then shutdown anyway. At 30s/hb (default value),
# 5,760 heartbeats == "48 hours". (Note that this value will be ignored if
# idle_heartbeats_soft is 0 or not set.)
idle_heartbeats_hard: 5760
"""
# Set the UUID of the default Chiltepin thick client
CHILTEPIN_CLIENT_UUID = "42e9e804-0bcd-4c3d-881b-8e270e3c2163"
def _check_endpoint_management_available():
"""Check if endpoint management is available on this platform.
Raises:
NotImplementedError: If not running on Linux
ImportError: If globus-compute-endpoint could not be imported
"""
if platform.system() != "Linux":
raise NotImplementedError(
"Endpoint management is only supported on Linux.\n\n"
"macOS users: All features except endpoint management are available. "
"Use the Docker container for full functionality including endpoint management.\n\n"
"Windows users: Native Windows is not supported. "
"Use the Docker container or WSL2 with a Linux distribution."
)
if not ENDPOINT_MANAGEMENT_AVAILABLE:
raise ImportError(
"Endpoint management requires the 'globus-compute-endpoint' package, "
"which is a Linux-only dependency. If you are on Linux, please install "
"'globus-compute-endpoint' to use endpoint management features."
) from _ENDPOINT_IMPORT_ERROR
[docs]def get_chiltepin_apps() -> Tuple[GlobusApp, GlobusApp, GlobusApp]:
"""Get the Globus Apps for Globus Compute, Globus Transfer, and Academy Exchange
This instantiates GlobusApp objects for use in creating Globus Compute
and Globus Transfer clients and for authenticating to the Academy Agent Exchange
server. If the environment contains settings that specify client ids and/or
client secrets, those will be used to create the Globus Apps. Otherwise, the
default Chiltepin thick client will be used. If a secret is present in the
environment, ClientApp objects will be created. Otherwise, UserApp objects will
be created. This is used by the login() and logout() functions where login and
logout flows are initiated after the apps are retrieved. A tuple is returned
where the first item is the compute app, the second item is the transfer app, and
the third item is the academy app. When called while environment variables are set
for a client id and secret, this will return apps configured with those credentials.
Returns
-------
Tuple[GlobusApp, GlobusApp, GlobusApp]
"""
# Get client id and secret from environment if they are set
client_id = os.environ.get("GLOBUS_COMPUTE_CLIENT_ID", None)
client_secret = os.environ.get("GLOBUS_COMPUTE_CLIENT_SECRET", None)
# If a client secret was found, make sure a client id was also found
if client_secret and not client_id:
raise RuntimeError(
"$GLOBUS_COMPUTE_CLIENT_SECRET is set but $GLOBUS_COMPUTE_CLIENT_ID is not"
)
# If a secret and id were both found, set the corresponding Globus CLI env vars
if client_secret:
os.environ["GLOBUS_CLI_CLIENT_ID"] = client_id
os.environ["GLOBUS_CLI_CLIENT_SECRET"] = client_secret
os.environ["ACADEMY_GLOBUS_CLIENT_ID"] = client_id
os.environ["ACADEMY_GLOBUS_CLIENT_SECRET"] = client_secret
# If a client id was not found in the environment, use the default Chiltepin client id
if not client_id:
client_id = CHILTEPIN_CLIENT_UUID
os.environ["GLOBUS_COMPUTE_CLIENT_ID"] = client_id
# NOTE: $GLOBUS_CLI_CLIENT_ID should only be set if $GLOBUS_CLI_CLIENT_SECRET is also set
# NOTE: $ACADEMY_GLOBUS_CLIENT_ID should only be set if $ACADEMY_GLOBUS_CLIENT_SECRET is also set
# Get the Globus App the compute client will use
# This uses the environment variables GLOBUS_COMPUTE_CLIENT_ID and GLOBUS_COMPUTE_CLIENT_SECRET to
# determine which app to load, so it will use the default Chiltepin thick client if no client id
# is set in the environment.
compute_app = get_globus_compute_app()
compute_app.add_scope_requirements(
{
WebClient.scopes.resource_server: WebClient.default_scope_requirements,
ComputeAuthClient.scopes.resource_server: ComputeAuthClient.default_scope_requirements, # noqa E501
}
)
# Create a Globus App for the transfer client
if client_secret:
# Use a ClientApp for Service Client credentials
transfer_app = ClientApp(
"chiltepin",
client_id=client_id,
client_secret=client_secret,
)
else:
# Use a UserApp for user credentials
transfer_app = UserApp(
"chiltepin",
client_id=client_id,
)
# Get the Globus App for Academy Agent Exchange authentication
academy_app = get_globus_academy_app()
academy_app.add_scope_requirements(
{
AcademyExchangeScopes.resource_server: [
AcademyExchangeScopes.academy_exchange,
],
},
)
# Return the Apps
return (compute_app, transfer_app, academy_app)
[docs]def login() -> Dict[str, Union[Client, TransferClient]]:
"""Log in to the Chiltepin app
This initiates the Globus login flow to log the user in to the Globus compute
and transfer services as well as the Academy Exchange. The login will use the
registered Chiltepin thick client by default, or the client id and/or secret
specified in the environment. This returns a Globus Compute client and a Globus
Transfer client in a dictionary. Those clients can then be used for accessing
those services.
Returns
-------
Dict[str, Client | TransferClient]
"""
# Get the Globus Apps for use in creating the clients
compute_app, transfer_app, academy_app = get_chiltepin_apps()
# Initialize the compute client
compute_client = Client(app=compute_app)
# Initialize the transfer client
transfer_client = TransferClient(app=transfer_app)
# We don't need an Academy client instance
# transfer_client.add_app_data_access_scope("d75f3e86-df3c-4734-8b9d-f182346b4bbd")
# Initiate login for compute client if necessary
if compute_app.login_required():
compute_app.login()
# Initiate login for transfer client if necessary
if transfer_app.login_required():
transfer_app.login(
auth_params=GlobusAuthorizationParameters(
session_required_single_domain=["rdhpcs.noaa.gov"],
prompt="login",
)
)
# Initiate login for academy app if necessary
if academy_app.login_required():
academy_app.login()
# Return the clients
return {"compute": compute_client, "transfer": transfer_client}
[docs]def login_required() -> bool:
"""Check whether a chiltepin login is required to use the requested Globus
scopes needed by the Chiltepin transfer, compute, and Academy Apps.
Returns
-------
bool
"""
# Get the Globus Apps for use in creating the clients
compute_app, transfer_app, academy_app = get_chiltepin_apps()
return (
compute_app.login_required()
or transfer_app.login_required()
or academy_app.login_required()
)
[docs]def logout():
"""Log out of the Chiltepin app
This logs the user out of the Globus Compute, Globus Transfer, and Academy
services and revokes all credentials associated with them.
"""
# Get the Globus Apps for use in creating the clients
compute_app, transfer_app, academy_app = get_chiltepin_apps()
compute_app.logout()
transfer_app.logout()
academy_app.logout()
[docs]def show(
config_dir: Optional[str] = None,
) -> Dict[str, Dict[str, Optional[str]]]:
"""Return a dictionary of configured Globus Compute Endpoints
This returns endpoint information in a dict with keys corresponding to
the endpoint names.
Parameters
----------
config_dir: str | None
Path to endpoint configuration directory where endpoint information
is stored. If None (the default), then $HOME/.globus_compute is used
Returns
-------
Dict[str, Dict[str, Optional[str]]]
"""
_check_endpoint_management_available()
config_dir_path = (
Path(config_dir) if config_dir else Path.home() / ".globus_compute"
)
endpoint_info = Endpoint.get_endpoints(config_dir_path)
return endpoint_info
[docs]def exists(
name: str,
config_dir: Optional[str] = None,
) -> bool:
"""Return True if the endpoint exists, otherwise False
Parameters
----------
name: str
Name of the endpoint to check
config_dir: str | None
Path to endpoint configuration directory where endpoint information
is stored. If None (the default), then $HOME/.globus_compute is used
Returns
-------
bool
"""
# Get the endpoint info
endpoints = show(config_dir)
# Return whether the endpoint exists in the listing
return name in endpoints
[docs]def is_running(
name: str,
config_dir: Optional[str] = None,
) -> bool:
"""Return True if the endpoint is running, otherwise False
Parameters
----------
name: str
Name of the endpoint to check
config_dir: str | None
Path to endpoint configuration directory where endpoint information
is stored. If None (the default), then $HOME/.globus_compute is used
Returns
-------
bool
"""
# Get the endpoint info
endpoints = show(config_dir)
# Extract the endpoint record
endpoint = endpoints.get(name, {})
# Return whether the endpoint state is "Running"
return endpoint.get("status", None) == "Running"
[docs]def start(
name: str,
config_dir: Optional[str] = None,
timeout: Optional[float] = None,
):
"""Start the specified Globus Compute Endpoint
This is a thin wrapper around the globus-compute-endpoint start command
Parameters
----------
name: str
Name of the endpoint to start
config_dir: str | None
Path to endpoint configuration directory where endpoint information
is stored. If None (the default), then $HOME/.globus_compute is used
timeout: float | None
Number of seconds to wait for the command to complete before timing out
Default is None, meaning the command will never time out.
"""
_check_endpoint_management_available()
# Make sure we are logged in
if login_required():
raise RuntimeError("Chiltepin login is required")
# Build the globus-compute-endpoint command to run
command = ["globus-compute-endpoint"]
if config_dir:
command.append("-c")
command.append(f"{os.path.abspath(config_dir)}")
command.append("start")
command.append(name)
# Create a temporary file to capture initial stderr for failure detection
temp_stderr = tempfile.NamedTemporaryFile(
mode="w+", prefix=f"chiltepin_start_{name}_", suffix=".err", delete=False
)
temp_stderr_path = temp_stderr.name
temp_stderr.close()
# Run the command as a detached daemon process using double-fork
# to completely disconnect from the parent process tree.
# NOTE: subprocess.Popen with start_new_session=True does not work to
# fully detach the process because globus-compute-endpoint uses psutil
# to manage subprocesses, and psutil requires the parent process to still
# be alive to avoid orphaning the child processes it creates. The double-fork
# method is used here to create a new session and then immediately exit
# the first child, leaving the grandchild process running as a daemon that
# is not a child of the original parent process.
pid = os.fork()
if pid == 0: # pragma: no cover
# First child - create new session (runs in forked process, untestable)
os.setsid()
# Fork again
pid2 = os.fork()
if pid2 == 0:
# Second child (grandchild) - this becomes the daemon
# Redirect stdin and stdout to /dev/null, but stderr to temp file
# so we can capture immediate failures
devnull = os.open(os.devnull, os.O_RDWR)
os.dup2(devnull, 0) # Redirect stdin to /dev/null
os.dup2(devnull, 1) # Redirect stdout to /dev/null
# Redirect stderr to temp file for failure detection
stderr_fd = os.open(
temp_stderr_path, os.O_WRONLY | os.O_CREAT | os.O_APPEND
)
os.dup2(stderr_fd, 2)
if devnull > 2:
os.close(devnull)
if stderr_fd > 2:
os.close(stderr_fd)
# Execute the endpoint command
os.execvp(command[0], command)
else:
# First child exits immediately
os._exit(0)
else:
# Parent waits for first child to exit
os.waitpid(pid, 0)
# Wait for endpoint to enter "Running" state
start_time = time.time()
try:
while True:
# Calculate remaining timeout for this iteration
if timeout is not None:
elapsed = time.time() - start_time
if elapsed > timeout:
# Check for error output before timing out
error_msg = _read_startup_errors(temp_stderr_path)
timeout_msg = f"Timeout of {timeout}s exceeded while waiting for endpoint '{name}' to start"
if error_msg:
raise TimeoutError(
f"{timeout_msg}\n\nStartup errors:\n{error_msg}"
)
raise TimeoutError(timeout_msg)
# Check if endpoint is running, passing remaining timeout to prevent hanging
if is_running(name, config_dir):
break
# Check for errors immediately - if the endpoint failed, report it
error_msg = _read_startup_errors(temp_stderr_path)
if error_msg:
raise RuntimeError(
f"Endpoint '{name}' failed to start. Error output:\n{error_msg}"
)
time.sleep(1)
finally:
# Clean up temporary error file
try:
os.unlink(temp_stderr_path)
except OSError:
pass
def _read_startup_errors(stderr_path: str, max_size: int = 10240) -> str:
"""Read initial error output from endpoint startup.
Parameters
----------
stderr_path : str
Path to the temporary stderr file
max_size : int
Maximum number of bytes to read from the file
Returns
-------
str
Error content if any, empty string otherwise
"""
try:
if os.path.exists(stderr_path) and os.path.getsize(stderr_path) > 0:
with open(stderr_path, "r") as f:
content = f.read(max_size)
return content.strip()
except (OSError, IOError):
pass
return ""
[docs]def stop(
name: str,
config_dir: Optional[str] = None,
timeout: Optional[float] = None,
):
"""Stop the specified Globus Compute Endpoint
This is a thin wrapper around the globus-compute-endpoint stop command
Parameters
----------
name: str
Name of the endpoint to stop
config_dir: str | None
Path to endpoint configuration directory where endpoint information
is stored. If None (the default), then $HOME/.globus_compute is used
timeout: float | None
Number of seconds to wait for the command to complete before timing out
Default is None, meaning the command will never time out.
"""
_check_endpoint_management_available()
# Make sure we are logged in
if login_required():
raise RuntimeError("Chiltepin login is required")
# Get the path to the globus compute endpoint configuration
config_path = (
Path(config_dir) / name
if config_dir
else Path.home() / ".globus_compute" / name
)
# Track elapsed time to enforce timeout across both subprocess and wait loop
start_time = time.time()
try:
Endpoint.stop_endpoint(config_path, get_config(config_path), remote=False)
except psutil.TimeoutExpired:
# Try one more time if we get a psutil timeout, since that can happen if the endpoint
# enters a bad state and fails to stop within the expected time.
Endpoint.stop_endpoint(config_path, get_config(config_path), remote=False)
# Wait for endpoint to enter "Stopped" state
while True:
# Calculate remaining timeout for this iteration
if timeout is not None:
elapsed = time.time() - start_time
if elapsed > timeout:
raise TimeoutError(
f"Timeout of {timeout}s exceeded while waiting for endpoint '{name}' to stop"
)
# Check if endpoint is still running, passing remaining timeout to prevent hanging
if not is_running(name, config_dir):
break
time.sleep(1)
[docs]def delete(
name: str,
config_dir: Optional[str] = None,
timeout: Optional[float] = None,
):
"""Delete the specified Globus Compute Endpoint
This is a thin wrapper around the globus-compute-endpoint delete command
Parameters
----------
name: str
Name of the endpoint to delete
config_dir: str | None
Path to endpoint configuration directory where endpoint information
is stored. If None (the default), then $HOME/.globus_compute is used
timeout: float | None
Number of seconds to wait for the command to complete before timing out
Default is None, meaning the command will never time out.
"""
_check_endpoint_management_available()
# Make sure we are logged in
if login_required():
raise RuntimeError("Chiltepin login is required")
# Get the path to the globus compute endpoint configuration
config_path = (
Path(config_dir) / name
if config_dir
else Path.home() / ".globus_compute" / name
)
# Track elapsed time to enforce timeout
start_time = time.time()
# Get the endpoint config
try:
ep_config = None
force = False
ep_config = get_config(config_path)
except Exception:
force = True
# Delete the endpoint
try:
Endpoint.delete_endpoint(config_path, ep_config, force=force, ep_uuid=None)
except Exception as e:
raise RuntimeError("Error deleting endpoint") from e
# Wait for endpoint to disappear from the listing
while True:
# Calculate remaining timeout for this iteration
if timeout is not None:
elapsed = time.time() - start_time
if elapsed > timeout:
raise TimeoutError(
f"Timeout of {timeout}s exceeded while waiting for endpoint '{name}' to be deleted"
)
# Check if endpoint still exists
if not exists(name, config_dir):
break
time.sleep(1)