Source code for jasmin_arc.arc_interface

import os
import sys
import tempfile
import json
import subprocess

from jinja2 import Environment, PackageLoader, select_autoescape
import arc

from constants import JobStatuses, ARC_STATUS_MAPPING, LogLevels
from config import ConnectionConfig
from exceptions import (InvalidConfigError, ProxyGenerationError, InvalidJobDescription,
                        JobSubmissionError, NoTargetsAvailableError, JobNotFoundError,

# Location of directory containing templates for JSDL XML
TEMPLATES_DIR = "templates"

[docs]class ArcInterface(object): """ Class to handle interactions with the ARC-CE server """
[docs] def __init__(self, config_path=None, log=sys.stdout, log_level=LogLevels.INFO): """ Create an object to interface with the ARC server. :param config_path: Path to config JSON file, or ``None`` to use the default settings :param log: File-like object to write log messages to, or ``None`` to disable logging. Use ``sys.stdout`` or ``sys.stderr`` to print messages (default: ``sys.stdout``). :param log_level: The level of detail logs should show (default: `LogLevels.INFO`). See `LogLevels` for the available levels :raises InvalidConfigError: if config is not valid JSON or is otherwise invalid """ self.logger = arc.Logger(arc.Logger_getRootLogger(), "jobsubmit") # Add a log destination if the user has provided one if log: log_dest = arc.LogStream(log) log_dest.setFormat(arc.ShortFormat) arc.Logger_getRootLogger().addDestination(log_dest) arc.Logger_getRootLogger().setThreshold(log_level.value) config_dict = {} if config_path: try: self.logger.msg(arc.DEBUG, "Using jasmin_arc config: {}".format(config_path)) # Let errors reading file bubble to calling code with open(config_path) as config_file: config_dict = json.load(config_file) # Catch JSON parsing errors except ValueError as e: raise InvalidConfigError(e.message) self.config = ConnectionConfig(config_dict, logger=self.logger) # Create jinja2 environment for loading JSDL template(s) self.env = Environment(loader=PackageLoader(__name__, TEMPLATES_DIR), autoescape=select_autoescape(["xml"])) self.cached_user_config = None
[docs] def submit_job(self, executable, args=[], input_files=[]): """ Submit a job and return the job ID :param executable: The command to run on the LOTUS cluster :param args: List of arguments to pass to the executable :param input_files: A list of paths to local files to copy to the remote session directory (the directory the job will run from on JASMIN) :raises InputFileError: if any of the specified input files do not exist or are directories :raises NoTargetsAvailableError: if no execution targets can be found on the ARC server :raises JobSubmissionError: if the job cannot be submitted to any targets :return: Job ID """ endpoint = arc.Endpoint(self.config.ARC_SERVER, arc.Endpoint.COMPUTINGINFO) user_config = self.get_user_config() # Get the ExecutionTargets of this ComputingElement retriever = arc.ComputingServiceRetriever(user_config, [endpoint]) retriever.wait() targets = retriever.GetExecutionTargets() if len(targets) == 0: raise NoTargetsAvailableError("No targets available") input_files_map = {} # Map local paths to destination file names for filename in input_files: if not os.path.isfile(filename): raise InputFileError("{} is not a file".format(filename)) # Use absolute local path input_files_map[os.path.abspath(filename)] = os.path.basename(filename) template = self.env.get_template("job_template.xml") jsdl = template.render({ "name": "ARC job", # TODO: Use sensible name or omit "executable": executable, "arguments": args, "input_files_map": input_files_map, "output_file": self.config.OUTPUT_FILE }) job_descriptions = self.get_job_descriptions(jsdl) # Create an empty job object which will contain our submitted job job = arc.Job() # Submit job directly to the execution targets, without a broker # Try each target until successfully submitted for target in targets: msg = "Attempting to submit job to {} ({})".format(target.ComputingEndpoint.URLString, target.ComputingEndpoint.InterfaceName) self.logger.msg(arc.DEBUG, msg) if target.Submit(user_config, job_descriptions[0], job): break else: self.logger.msg(arc.DEBUG, "Failed to submit job") else: raise JobSubmissionError("Could not submit job to any of the {} available target(s)" .format(len(targets))) self.logger.msg(arc.INFO, "Started job with ID: {}".format(job.JobID)) # Write information on submitted job to local job list so standard arc tools (arcstat, # arcget etc) can be used with this job job_list = arc.JobInformationStorageBDB(self.config.JOBS_INFO_FILE) if not job_list.Write([job]): self.logger.msg(arc.WARNING, "Failed to write to local job list {}".format(self.config.JOBS_INFO_FILE)) return job.JobID
[docs] def get_job_status(self, job_id): """ Return the status of the given job :param job_id: ID of the job as returned by `submit_job` :raises JobNotFoundError: if no job with the given ID could be found :return: The status of the job (see `JobStatuses` for the available values) """ job = self.get_job(job_id) # Map ARC status to a value in JobStatuses return ARC_STATUS_MAPPING[job.State.GetGeneralState()]
[docs] def cancel_job(self, job_id): """ Cancel the given job :param job_id: ID of the job as returned by `submit_job` :raises JobNotFoundError: if no job with the given ID could be found """ self.logger.msg(arc.INFO, "Cancelling job {}".format(job_id)) job = self.get_job(job_id) if not job.Cancel(): self.logger.msg(arc.WARNING, "Failed to cancel job")
[docs] def save_job_outputs(self, job_id): """ Retrieve output files from a job and save them to a temp directory. The file/directory specified in `OUTPUT_FILE` will be downloaded, and ``stdout`` and ``stderr`` outputs are saved as ``stdout.txt`` and ``stderr.txt`` respectively. :param job_id: ID of the job as returned by `submit_job` :raises JobNotFoundError: if no job with the given ID could be found :return: Path to the directory the output files were saved in, or ``None`` if no files were saved """ job = self.get_job(job_id) user_config = self.get_user_config() temp_dir = tempfile.mkdtemp() # Last argument is 'force' - whether to continue if destination directory already exists success = job.Retrieve(user_config, arc.URL("file://{}".format(temp_dir)), True) # Remove temp dir and fail if no files were downloaded if not os.listdir(temp_dir): success = False os.rmdir(temp_dir) return temp_dir if success else None
[docs] def create_proxy(self): """ Use ``arcproxy`` to create a proxy certificate from private key and certificate, and save it to the path given in the config :raises ProxyGenerationError: if the certificate cannot be generated """ try: output = subprocess.check_output([ self.config.ARCPROXY_PATH, "-C", self.config.CLIENT_CERT, "-K", self.config.CLIENT_KEY, "-P", self.config.PROXY_FILE, "-c", "validityPeriod={}".format(self.config.PROXY_VALIDITY_PERIOD) ]) except subprocess.CalledProcessError: raise ProxyGenerationError("Could not create proxy with arcproxy") except OSError as ex: raise OSError("Failed to run arcproxy command: {}".format(ex)) self.logger.msg(arc.INFO, "arcproxy output:\n{}".format(output))
[docs] def get_job(self, job_id): """ Return an instance of ``arc.Job`` representing the job with the given ID :param job_id: ID of the job as returned by `submit_job` :raises JobNotFoundError: if no job with the given ID could be found :return: Instance of ``arc.Job`` representing the job """ user_config = self.get_user_config() # Create a JobSupervisor to handle all the jobs job_supervisor = arc.JobSupervisor(user_config) # Retrieve all the jobs from this computing element endpoint = arc.Endpoint(self.config.ARC_SERVER, arc.Endpoint.JOBLIST) retriever = arc.JobListRetriever(user_config) retriever.addConsumer(job_supervisor) retriever.addEndpoint(endpoint) retriever.wait() # Update the states of the jobs job_supervisor.Update() # Get all jobs and find job by ID jobs = job_supervisor.GetAllJobs() for job in jobs: if job.JobID == job_id: return job raise JobNotFoundError("Could not find a job with ID '{}'".format(job_id))
[docs] def get_user_config(self): """ Return the cached user config, or create a new one. Also check if proxy has expired, and create a new one if so :return: An instance of ``arc.UserConfig`` (see `create_user_config`) """ # Create a new config if this is the first time if not self.cached_user_config: self.cached_user_config = self.create_user_config() return self.cached_user_config # Check proxy is still valid if using cached user config # Call arcproxy to query how many seconds proxy is valid for try: output = subprocess.check_output([self.config.ARCPROXY_PATH, "-P", self.config.PROXY_FILE, "-i", "validityLeft"]) except subprocess.CalledProcessError: raise ProxyGenerationError("Failed to check proxy expiry time") except OSError as ex: raise OSError("Failed to run arcproxy command: {}".format(ex)) try: seconds_left = int(output) except ValueError as ex: raise ProxyGenerationError("Failed to determine proxy expiry time: {}".format(ex)) if seconds_left <= self.config.PROXY_RENEWAL_THRESHOLD: self.logger.msg(arc.INFO, "Renewing proxy") self.create_proxy() return self.cached_user_config
[docs] def create_user_config(self): """ Create a user config for use with ARC client :return: An instance of ``arc.UserConfig`` containing information about the necessary keys, certificates and proxy files """ self.create_proxy() # Write client config to temp file - arc python library seems buggy when using a # proxy file in non-default location. Default location has the current user's # UID appended to it, so this is probably the cleanest way conf_template = self.env.get_template("arc_config.ini") conf_filename = None with tempfile.NamedTemporaryFile(delete=False) as conf_file: conf_filename = conf_file.write(conf_template.render({ "proxy_file": self.config.PROXY_FILE, "certs_dir": self.config.CERTS_DIR })) user_config = arc.UserConfig(conf_filename) os.unlink(conf_filename) return user_config
[docs] def get_job_descriptions(self, jsdl): """ Return an instance of ``arc.JobDescriptionList`` containing the job described by the given JSDL :param jsdl: String containing the job description in JSDL format """ job_descriptions = arc.JobDescriptionList() temp_filename = None with tempfile.NamedTemporaryFile(delete=False) as temp_file: temp_filename = temp_file.write(jsdl) try: if not arc.JobDescription_ParseFromFile(temp_filename, job_descriptions): raise InvalidJobDescription("Could not parse job description XML") finally: # Delete the temp file - finally clause is run even if exception is raised os.unlink(temp_filename) return job_descriptions