Source code for StructuredData.SDshelllibBase

"""basic function for the shell.
"""

import inspect
import os.path
import pprint
import csv
import io
from StructuredData.internal import myyaml
from StructuredData.internal import Repr
import StructuredData.Classes as SD

__version__="5.1.1" #VERSION#

assert __version__==myyaml.__version__
assert __version__==Repr.__version__
assert __version__==SD.__version__

# pylint: disable= invalid-name

csvdelim= ';'

locked_files= set()
locked_dirs= set()

[docs]def use_file_lock(flag): """switch file locking on and off.""" SD.use_lockfile= flag
[docs]def add_locked_file(filename): """add locked file.""" locked_files.add(os.path.abspath(filename))
[docs]def add_locked_dir(dir_): """add locked dir.""" locked_dirs.add(os.path.abspath(dir_))
[docs]def module_functions(modules): """return all functions of a dict of modules. modules must be a dict mapping module names to module objects. The function returns a dict mapping function names to function objects. """ def fname(mod_name,fun): """function name.""" if not mod_name: return fun return "%s.%s" % (mod_name, fun) funcs= {} for (mod_name, mod) in modules.items(): for m in inspect.getmembers(mod): if not inspect.isfunction(m[1]): continue funcs[fname(mod_name,m[0])]= m[1] return funcs
[docs]def is_readonly(path): """returns True of the path (file) should not be written. """ path= os.path.abspath(path) (head, tail)= os.path.split(path) head= os.path.abspath(head) if head in locked_dirs: return "directory '%s' is read-only" % head p= os.path.join(head, tail) if p in locked_files: return "file '%s' is read-only" % p return None
[docs]def read_py(f): """read as python struct.""" # pylint: disable= eval-used dict_= eval(open(f).read()) return dict_
[docs]def py(val): """format as python.""" return pprint.pformat(val)
[docs]def prettyprint(filename, struct): """pretty print a structure to console or a file.""" if not filename: # filename==None or filename=="": return py(struct) stream= open(filename, 'w') pprint.pprint(struct, stream) stream.close() return filename
[docs]def write_yml(filename, struct): """write in yaml to console or a file.""" if not filename: # filename==None or filename=="": return myyaml.write_string(struct) myyaml.write_file(filename, struct) return filename
[docs]def as_csv(iterator, delimiter): """return as csv data.""" result= io.StringIO() csvwriter= csv.writer(result, delimiter= delimiter, lineterminator= os.linesep) for elm in sorted(iterator): csvwriter.writerow(elm) contents= result.getvalue() result.close() return contents
[docs]def aligned(val): """return aligned.""" maxlen=0 d= {} if hasattr(val, "items"): for (k,v) in val.items(): if len(k)>maxlen: maxlen=len(k) d[k]= v elif hasattr(val, "__iter__"): for (k,v) in val: if len(k)>maxlen: maxlen=len(k) d[k]= v else: raise AssertionError("unsupported val for align: %s" % repr(val)) res= [] for k in sorted(d.keys()): res.append("%s: %s" % (k.ljust(maxlen),d[k])) return "\n".join(res)
[docs]def yaml(val): """return yaml.""" return myyaml.write_string(val)
[docs]def multi(format_, val, delimiter=";"): """multi formatter.""" # pylint: disable= no-else-return if format_=="raw": if isinstance(val, str): # return a string as it is: return val # otherwise return a nice "repr" string: return Repr.repr(val) elif format_=="yaml": return yaml(val) elif format_=='py': return py(val) elif format_=='csv': return as_csv(val, delimiter) else: raise ValueError("ERROR: unexpected format_:%s" % format_)
[docs]class MultiStringOption(): """A helper class for string options. Here are some examples: >>> m= _MultiStringOption({"a":0, "b":0, "c":0, "x":1, "y":1},[None,"y"]) >>> m.parse("x:b") ['b', 'x'] >>> m.parse("x:b:a") Traceback (most recent call last): ... ValueError: contradicting part 'a' in spec 'x:b:a' >>> m.parse("a") ['a', 'y'] >>> m.parse("x") [None, 'x'] >>> m.parse("z") Traceback (most recent call last): ... ValueError: unknown part 'z' in spec 'z' """ def __init__(self, values, defaults): """constructor.""" self._values= values self._defaults= defaults def __repr__(self): """return a repr string for the object.""" return "_MultiStringOption(%s, %s)" % \ (repr(self._values), repr(self._defaults))
[docs] def parse(self, spec): """parse a spec string. parameters: spec - specification string returns: a tuple (result, new): """ results= [None] * len(self._defaults) lst= spec.split(":") for st in lst: if st=="": continue st_l= st.lower() idx= self._values.get(st_l) if idx is None: raise ValueError("unknown part '%s' in spec '%s'" % \ (st, spec)) if results[idx] is not None: raise ValueError("contradicting part '%s' in spec '%s'" % \ (st, spec)) results[idx]= st_l # pylint: disable= consider-using-enumerate for idx in range(len(results)): if results[idx] is None: results[idx]= self._defaults[idx] return results