""" Misc functions for interacting between the OS and the prisms_jobs """
from __future__ import (absolute_import, division, print_function, unicode_literals)
from builtins import *
import datetime
import os
import pwd
import subprocess
import sys
def _set_encoding(encoding=None):
if encoding is None:
if sys.stdout.encoding is not None:
return sys.stdout.encoding
else:
return 'utf-8'
else:
return encoding
def _decode(val, encoding=None):
try:
if isinstance(val, bytes):
return val.decode(_set_encoding(encoding))
else:
return val
except Exception as e:
print("Exception in prisms_jobs.misc._decode:", e)
print("val:", val)
print("sys.stdout.encoding:", sys.stdout.encoding)
raise e
def run(cmd, input=None, stdin=None, encoding=None):
"""Run subprocess and return stdout, stderr as text, returncode as int
Args:
cmd (List[str]): Command to run as subprocess
input (str): Data to be sent to child process
stdin (stream): Use subprocess.PIPE to pass data via stdin
encoding (str, optional): Encoding to use to decode stdout, stderr. By
default, uses sys.stdout.encoding if available, else 'utf-8'.
Returns:
(stdout, stderr, returncode): With stdout and stderr as strings, and
returncode as int
"""
try:
p = subprocess.Popen(cmd, stdin=stdin, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
encoding = _set_encoding(encoding)
if input is not None:
input = bytearray(input, encoding=encoding)
stdout, stderr = p.communicate(input=input)
return (_decode(stdout, encoding), _decode(stderr, encoding), p.returncode)
except Exception as e:
print("Exception in prisms_jobs.misc.run:", e)
print("cmd:", cmd)
print("input:", input)
print("stdin:", stdin)
print("encoding:", encoding)
print("sys.stdout.encoding:", sys.stdout.encoding)
raise e
[docs]def getlogin():
"""Returns os.getlogin(), else os.environ["LOGNAME"], else "?" """
try:
return pwd.getpwuid(os.getuid())[0]
except:
return os.environ.get("LOGNAME","?")
[docs]def seconds(walltime):
"""Convert [[[DD:]HH:]MM:]SS to hours"""
wtime = walltime.split(":")
if len(wtime) == 1:
return float(wtime[0])
elif len(wtime) == 2:
return float(wtime[0])*60.0 + float(wtime[1])
elif len(wtime) == 3:
return float(wtime[0])*3600.0 + float(wtime[1])*60.0 + float(wtime[2])
elif len(wtime) == 4:
return (float(wtime[0])*24.0*3600.0
+ float(wtime[0])*3600.0
+ float(wtime[1])*60.0
+ float(wtime[2]))
else:
print("Error in walltime format:", walltime)
sys.exit()
[docs]def hours(walltime):
"""Convert [[[DD:]HH:]MM:]SS to hours"""
wtime = walltime.split(":")
if len(wtime) == 1:
return float(wtime[0])/3600.0
elif len(wtime) == 2:
return float(wtime[0])/60.0 + float(wtime[1])/3600.0
elif len(wtime) == 3:
return float(wtime[0]) + float(wtime[1])/60.0 + float(wtime[2])/3600.0
elif len(wtime) == 4:
return (float(wtime[0])*24.0
+ float(wtime[0])
+ float(wtime[1])/60.0
+ float(wtime[2])/3600.0)
else:
print("Error in walltime format:", walltime)
sys.exit()
[docs]def strftimedelta(seconds): #pylint: disable=redefined-outer-name
"""Convert seconds to D+:HH:MM:SS"""
seconds = int(seconds)
day_in_seconds = 24.0*3600.0
hour_in_seconds = 3600.0
minute_in_seconds = 60.0
day = int(seconds/day_in_seconds)
seconds -= day*day_in_seconds
hour = int(seconds/hour_in_seconds)
seconds -= hour*hour_in_seconds
minute = int(seconds/minute_in_seconds)
seconds -= minute*minute_in_seconds
return str(day) + ":" + ("%02d" % hour) + ":" + ("%02d" % minute) + ":" + ("%02d" % seconds)
[docs]def exetime(deltatime):
"""Get the exetime string for the PBS '-a'option from a [[[DD:]MM:]HH:]SS string
exetime string format: YYYYmmddHHMM.SS
"""
return (datetime.datetime.now()
+datetime.timedelta(hours=hours(deltatime))).strftime("%Y%m%d%H%M.%S")