Source code for StructuredData.SDshelllibFun


implements the functional layer of StructuredData.

import os.path
import glob
import StructuredData.Classes as SD
from StructuredData.internal import rst_load
from StructuredData.internal import myyaml
import StructuredData.SDshelllibBase as base

__version__="5.1.1" #VERSION#

assert __version__==SD.__version__
assert __version__==rst_load.__version__
assert __version__==myyaml.__version__
assert __version__==base.__version__

# pylint: disable=invalid-name

write_defaults= { "container": None,
                  "store"    : None,
                  "types"    : None

std_return= None
# standard return value for commands


[docs]def SDtype(): """return the software stack on which we are running.""" return "library"
[docs]def global_sym(name): """return a global symbol of the given name.""" return globals()[name]
def struc2sym(struc): """return a global symbol from a structure. In XMLRPC we cannot simply transfer references global objects. So can implement here a way to get a global object from an arbitrary structure. The default is to return the object itself. """ return struc def sym2struc(sym): """return structure from a global symbol. vardict is usually global(). In XMLRPC we cannot simply transfer references global objects. So can implement here a way to convert a global object to an arbitrary structure. The default is to return the object itself. """ return sym
[docs]def error(exception, st): """raise an exception (why is this needed?).""" raise exception(st)
_sdshell_rst= None def _load_help(): """load the RST help file.""" global _sdshell_rst # pylint: disable= global-statement if _sdshell_rst is not None: return module_dir= os.path.split(SD.__file__)[0] data_dir= os.path.join(module_dir, "data") sd_shell_help_file= os.path.join(data_dir, "SDpyshell.rst") _sdshell_rst= rst_load.Rst(rst_load.read_file(sd_shell_help_file)) # pylint: disable=redefined-builtin
[docs]def help(item=None, level= None): """implement the shell's help function.""" _load_help() if not item: # filename==None or filename=="": lines=["Please select a topic:",""] lines.extend(_sdshell_rst.index()) elif not isinstance(item, str): lines=["error: 1st parameter must be a string"] else: lines= _sdshell_rst.text(item, level) return "\n".join(lines)
# pylint: enable=redefined-builtin SDC= None
[docs]def namedsdc(arg=None): """create a new StructuredDataContainer-Key. here an sdc-key and an sdc-object are identical. """ sdc= SD.StructuredDataContainer() sdc.set_store(SD.StructuredDataStore()) sdc.set_types(SD.StructuredDataTypes()) return sdc_register(sdc, arg)
[docs]def newsdc(): """create a new StructuredDataContainer-Key.""" return namedsdc(None)
[docs]def locksdc(sdc): """make the sdc read-only.""" sdc_obj= sdc_get(sdc) sdc_obj.lock() return std_return
[docs]def lockfile(filename): """define a filename read-only.""" base.add_locked_file(filename)
[docs]def lockdir(dirname): """define a directory read-only.""" base.add_locked_dir(dirname)
def sdc_register(sdc, user_key= None): """create an sdc key from a new sdc object. This function is not here to be called directly. It intended to be overridden by an application defined function that is then called whenever a new sdc object is created. In this "dummy" implementation of the function, it simply returns the sdc object, so sdc-key and sdc-object are identical here. The user_key is an user provided key for the sdc object. Since this implementation simply returns the sdc object itself, the user_key is ignored here. However, if this function is overridden, the user_key may be taken into account by the new function. """ # pylint: disable=unused-argument return sdc def sdc_get(sdc): """returns an sdc-object from an sdc-key. This function is not here to be called directly. It intended to be overridden by an application defined function that is then called whenever a new sdc object is read or written to. In this "dummy" implementation of the function, it simply returns the sdc object since the sdc-key and sdc-object are identical here. """ global SDC # pylint: disable= global-statement if not sdc: if not SDC: new_key= newsdc() # here an sdc-key and an sdc-object are identical: SDC= new_key return SDC return sdc def _process_paths(paths): """returns an iterator on a single string, a pathlist or a pairlist. """ # pylint: disable=redefined-outer-name if not SD.is_iterable(paths): yield paths return # StopIteration for elm in paths: if isinstance(elm,str) or (not hasattr(elm,"__getitem__")): yield elm else: yield elm[0] return # StopIteration def _path_cmd(path, func): """execute a command on a path or a list of paths.""" if not SD.is_iterable(path): func(path) else: for p in _process_paths(path): func(p) def _path_func(path, func): """execute a function on a path or a list of paths.""" if not SD.is_iterable(path): return func(path) return [func(p) for p in _process_paths(path)]
[docs]def string2pathkey(st): """apply escaping rules on '.','[',']' in st. """ return SD.escape_in_stringkey(st)
[docs]def pathkey2string(pk): """unapply escaping rules on '.','[',']' in pk. """ return SD.unescape_in_stringkey(pk)
[docs]def splitpath(path): """splits a path according to the rules. """ def my_split_path(path): """local split path utility.""" return [sym2struc(k) for k in SD.StructuredDataStore.split_path(path)] return _path_func(path, my_split_path)
[docs]def joinpath(keys): """joins a path according to the rules. """ return SD.StructuredDataStore.join_path([struc2sym(k) for k in keys])
[docs]def combinepaths(paths): """combine partial paths to a new one. """ # pylint: disable=redefined-outer-name keys= [] for p in paths: keys.extend(SD.StructuredDataStore.split_path(p)) return joinpath(keys)
[docs]def addpaths(path1, path2): """combine 2 paths. One may be a list of paths.""" if SD.is_iterable(path1) and SD.is_iterable(path2): error(ValueError, "ERROR: only one arg may be an iterable") return std_return # pylint: disable= no-else-return if SD.is_iterable(path1): def _combine(p): """utility function.""" return combinepaths((p, path2)) return _path_func(path1, _combine) elif SD.is_iterable(path2): def _combine(p): """utility function.""" return combinepaths((path1, p)) return _path_func(path2, _combine) else: return combinepaths((path1, path2))
[docs]def poppath(path, no=1): """removes parts of a path from the end. """ def _poppath(path_): """utility function.""" l= SD.StructuredDataStore.split_path(path_) if no>0: l= l[:-no] else: l= l[-no:] return SD.StructuredDataStore.join_path(l) return _path_func(path, _poppath)
[docs]def substpath(path, pattern): """changes the path according to a pattern. """ pattern_keylist= SD.StructuredDataStore.split_path(pattern) def _substpath(path_): """utility function.""" return SD.StructuredDataStore.join_path( \ SD.SpecialKey.keysubst( SD.StructuredDataStore.split_path(path_), pattern_keylist)) return _path_func(path, _substpath)
_read_formatspec_obj= base.MultiStringOption(\ { "container" : 0, "store" : 0, "types" : 0, "yaml" : 1, "py" : 1, "flat" : 2, "nonflat" : 2, }, ["container", "yaml", "nonflat"] \ ) def _my_filter(sds, path_patterns): """create a new sds with the filtered data.""" m= sds.simple_search(path_patterns= path_patterns, add_values= True, only_leaves= True, show_links= False) sds_new= SD.StructuredDataStore() sds_new.setitems(m, create_missing= True) return sds_new
[docs]def read(filename= None, formatspec="", sdc=None): """generic read from file. If bool(sdc) is False, use the global SDC variable. Note: this function *does not* create a new StructuredDataStore, use newsdc() if you want to create a new one. returns: (sdc, list_of_read_files) """ # pylint: disable=too-many-branches (structure, format_, extra)= _read_formatspec_obj.parse(formatspec) sdc_obj= sdc_get(sdc) if not filename: # filename==None or filename=="": globs= {"container": "./*.SDCyml", "store" : "./*.SDSyml", "types" : "./*.SDTyml"} lst= glob.glob(globs[structure]) else: lst= [filename] if lst: write_defaults[structure]= lst[0] for f in lst: if structure=="container": if format_=="yaml": sdc_new= SD.StructuredDataContainer.from_yaml_file(f) elif format_=='py': dict_= base.read_py(f) sdc_new= SD.StructuredDataContainer(dict_) else: raise AssertionError elif structure in ("store", "types"): if format_=="yaml": dict_= myyaml.read_file(f) elif format_=='py': dict_= base.read_py(f) else: raise AssertionError if structure=="store": if extra=="nonflat": sds= SD.StructuredDataStore(dict_) elif extra=="flat": sds= SD.StructuredDataStore.from_flat_dict(dict_) else: raise AssertionError sdc_new= SD.StructuredDataContainer() sdc_new.set_store(sds) elif structure=="types": sdt= SD.StructuredDataTypes(dict_) sdc_new= SD.StructuredDataContainer() sdc_new.set_types(sdt) else: raise AssertionError else: raise AssertionError sdc_obj.add(sdc_new.as_dict()) return (sdc,lst)
_write_formatspec_obj= base.MultiStringOption( \ { "container" : 0, "store" : 0, "types" : 0, "yaml" : 1, "py" : 1, "csv" : 1, "flat" : 2, "nonflat" : 2, }, ["container", "yaml", "nonflat"] \ )
[docs]def write(filename= None, formatspec="", pattern= None, sdc= None): """generic write to a file. returns: name of the written file or, when not bool(filename), the data itself or (when error() raises no exception) std_return in case of an error. """ # pylint: disable=too-many-statements # pylint: disable=too-many-return-statements # pylint: disable=too-many-branches (structure, format_, extra)= \ _write_formatspec_obj.parse(formatspec) sdc_object= sdc_get(sdc) sdc_local= sdc_object if structure=="store" and is None: error(ValueError, "ERROR: StructuredDataContainer contains no store") return std_return if structure=="types" and sdc_local.types() is None: error(ValueError, "ERROR: StructuredDataContainer contains no types") return std_return if (structure=="container") and filename: # (filename!=None and filename!="") msg= base.is_readonly(filename) if msg is not None: error(IOError, "ERROR: %s" % msg) return std_return if structure in ("container", "store"): if pattern: # pattern!=None and pattern!="": # create a new container with the filtered data: sdc_local= SD.StructuredDataContainer() sdc_local.set_types(sdc_object.types()) sdc_local.set_store(_my_filter(, pattern)) # pylint: disable= no-else-return if structure=="container": if format_=="yaml": if not filename: return sdc_local.as_yaml_string() else: sdc_local.as_yaml_file(filename) return filename elif format_=="py": return base.prettyprint(filename, sdc_local.as_dict()) else: raise AssertionError elif structure=="store": sds= if format_=="csv": # pylint: disable= no-else-return if not filename: # filename==None or filename=="": return sds.as_csv(align_values=True, delimiter=base.csvdelim) else: stream= open(filename, 'w') sds.as_csv(stream=stream, align_values=True, delimiter=base.csvdelim) stream.close() return filename else: if extra=="nonflat": dict_= sds.as_dict() elif extra=='flat': dict_= sds.as_flat_dict() else: raise AssertionError if format_=="yaml": return base.write_yml(filename, dict_) elif format_=="py": return base.prettyprint(filename, dict_) else: raise AssertionError elif structure=="types": sdt= sdc_local.types() # pylint: disable= no-else-return if format_=="yaml": dict_= sdt.as_path_dict() return base.write_yml(filename, dict_) elif format_=="py": dict_= sdt.as_path_dict() return base.prettyprint(filename, dict_) else: error(ValueError, "ERROR: unsupported format_ %s" % format_) return std_return else: raise AssertionError return std_return
[docs]def copy(sdc= None): """returns a copy of a StructuredDataContainer.""" sdc_new= sdc_get(sdc).clone(True) return sdc_register(sdc_new)
[docs]def clear_store(sdc= None): """clear the store of a StructuredDataContainer. """ sdc_get(sdc).clear_store() return std_return
[docs]def clear_types(sdc= None): """clear the types of a StructuredDataContainer. """ sdc_get(sdc).clear_types() return std_return
[docs]def filter_out(pattern, sdc= None): """returns a filtered StructuredDataContainer. """ sdc_obj= sdc_get(sdc) sdc_obj.set_store(_my_filter(, pattern)) return sdc
[docs]def paths(pattern="*", paths= None, sdc= None): """returns a list of all matching paths in a StructuredDataContainer.""" # pylint: disable=redefined-outer-name sds= sdc_get(sdc).store() m= sorted(sds.simple_search(path_patterns= pattern, path_list= paths, add_values= False, only_leaves= False, show_links= False)) return m
[docs]def internal_find(type_, pattern, show_links= False, path_list= None, sdc= None): """internal find function. This is used to cover several flavours of find functions. """ if type_==FIND_SIMPLE: m= sdc_get(sdc).store().simple_search(path_patterns= pattern, path_list= path_list, add_values= True, only_leaves= True, show_links= show_links) elif type_==FIND_IPATTERN: m= sdc_get(sdc).store().i_search(i_pattern= pattern, path_list= path_list, add_values= True, only_leaves= True, show_links= show_links) elif type_==FIND_REGEXP: m= sdc_get(sdc).store().regexp_search(regexp_pattern= pattern, path_list= path_list, add_values= True, only_leaves= True, show_links= show_links) else: raise "Assertion" return m
[docs]def find(pattern, show_links= False, paths= None, sdc= None): """return paths and values for a given pattern. """ # pylint: disable=redefined-outer-name return internal_find(FIND_SIMPLE, pattern=pattern, show_links= show_links, path_list= paths, sdc=sdc)
[docs]def ifind(pattern, show_links= False, paths= None, sdc= None): """return paths and values for a given "i"-pattern. """ # pylint: disable=redefined-outer-name return internal_find(FIND_IPATTERN, pattern=pattern, show_links= show_links, path_list= paths, sdc=sdc)
[docs]def rxfind(pattern, show_links= False, paths= None, sdc= None): """return paths and values for a given regexp. """ # pylint: disable=redefined-outer-name return internal_find(FIND_REGEXP, pattern=pattern, show_links= show_links, path_list= paths, sdc=sdc)
[docs]def internal_findval(type_, val_pattern, show_links= False, pattern= None, path_list= None, sdc= None): """internal findval function. This is used to cover several flavours of findval functions. """ # pylint: disable=too-many-arguments if type_==FINDVAL_SIMPLE: m= sdc_get(sdc).store().value_search(value= val_pattern, path_patterns= pattern, path_list= path_list, add_values= True, show_links= show_links) elif type_==FINDVAL_IPATTERN: m= sdc_get(sdc).store().value_i_search(i_pattern= val_pattern, path_patterns= pattern, path_list= path_list, add_values= True, show_links= show_links) elif type_==FINDVAL_REGEXP: m= sdc_get(sdc).store().value_regexp_search(regexp_pattern= \ val_pattern, path_patterns= pattern, path_list= path_list, add_values= True, show_links= show_links) else: raise "Assertion" return m
[docs]def findval(value, show_links= False, pattern= None, paths= None, sdc= None): """return paths and values for a given value. """ # pylint: disable=redefined-outer-name return internal_findval(FINDVAL_SIMPLE, val_pattern= value, show_links= show_links, pattern= pattern, path_list= paths, sdc=sdc)
[docs]def ifindval(val_pattern, show_links= False, pattern= None, paths= None, sdc= None): """return paths and values for a value matching an "i"-pattern. """ # pylint: disable=redefined-outer-name return internal_findval(FINDVAL_IPATTERN, val_pattern= val_pattern, show_links= show_links, pattern= pattern, path_list= paths, sdc=sdc)
[docs]def rxfindval(val_pattern, show_links= False, pattern= None, paths= None, sdc= None): """return paths and values for a value matching a regexp. """ # pylint: disable=redefined-outer-name return internal_findval(FINDVAL_REGEXP, val_pattern= val_pattern, show_links= show_links, pattern= pattern, path_list= paths, sdc=sdc)
[docs]def get(pattern, show_links= False, paths= None, sdc= None): """get a part of the structure for a given pattern.""" # pylint: disable=redefined-outer-name sdc_obj= sdc_get(sdc) res= [] for(_,_,val) in pattern, path_list= paths, only_leaves= False, sorted_= True, show_links= show_links): res.append(val) if not res: return None if len(res)==1: return res[0] return res
[docs]def change(path, value, sdc= None): """changes a single value.""" def _change(path_): """utility function.""" try: sdc_get(sdc).store()[path_]= value except KeyError as _: error(KeyError, "path \"%s\" doesn't exist" % path_) return std_return return std_return _path_cmd(path, _change) return std_return
[docs]def put(path, value, sdc= None): """add a single value.""" def _put(path_): """utility function.""" try: sdc_get(sdc).store().setitem(path_, value, create_missing= True, always_dicts= False) except KeyError as e: error(KeyError, str(e)) return std_return return std_return _path_cmd(path, _put) return std_return
[docs]def delete(path, sdc= None): """delete a single value.""" def _delete(path_): """utility function.""" try: del sdc_get(sdc).store()[path_] except KeyError as _: error(KeyError, "path \"%s\" doesn't exist" % path_) return std_return return std_return _path_cmd(path, _delete) return std_return
[docs]def typepaths(pattern="*", sdc= None): """find typepaths for a pattern.""" sdt= sdc_get(sdc).types() m= sorted(sdt.simple_search(pattern, add_values= False)) return m
[docs]def typefind(pattern, sdc= None): """find a type for a pattern.""" m= sdc_get(sdc).types().simple_search(pattern, add_values= True) m= [(p,t.to_dict()) for (p,t) in m] return m
[docs]def typeget(path, sdc= None): """get a type for a path.""" sdt= sdc_get(sdc).types() try: val= sdt[path] except KeyError as _: error(KeyError, "path \"%s\" doesn't exist" % path) return std_return if val is None: error(KeyError, "path \"%s\" doesn't exist" % path) return std_return val= val.to_dict() return val
[docs]def typeput(path, value, sdc= None): """set a type.""" try: sdc_get(sdc).types().add({path:value}) except ValueError as e: error(ValueError, str(e)) return std_return
def _find_type_items(sdc, path): """find a typespec item set. This function returns a set object. """ sdt= sdc_get(sdc).types() try: val= sdt[path] except KeyError as e: return (None,"path \"%s\" doesn't exist" % path) try: items= val.items() # method of a SingleTypeSpec object except TypeError as e: return (None, str(e)) return (items, None)
[docs]def typeadditem(path, item, sdc=None): """add an item to a type definition.""" if not isinstance(item, str): error(TypeError, "item must be a string") return std_return (items, msg)= _find_type_items(sdc, path) if items is None: error(KeyError, msg) return std_return items.add(item) return std_return
[docs]def typedelete(path, sdc= None): """delete a type.""" try: del sdc_get(sdc).types()[path] except KeyError as _: error(KeyError, "path \"%s\" doesn't exist" % path) return std_return
[docs]def typedeleteitem(path, item, sdc=None): """delete an item in a type definition.""" if not isinstance(item, str): error(TypeError, "item must be a string") return std_return (items, msg)= _find_type_items(sdc, path) if items is None: error(ValueError, msg) return std_return try: items.remove(item) except KeyError as _: error(KeyError, "cannot delete '%s': not found" % item) return std_return
[docs]def typecheck(sdc= None): """perform a typecheck on the StructuredDataContainer. """ try: sdc_get(sdc).typecheck() except TypeError as e: error(TypeError, "TypeError: %s" % e) return False return True
[docs]def typematch(path, sdc= None): """try to find a typecheck for the given path. """ sdt= sdc_get(sdc).types() if sdt is None: return None m= sdt.match(path, return_matchpath= True) if m is not None: if m[1] is None: m= None if m is None: d= {} else: d= { m[0]: m[1].to_dict() } return d