Source code for StructuredData.Classes

"""StructuredData - a package to manage StructuredData.
"""
# pylint: disable=too-many-lines, invalid-name, too-many-arguments

import os
import re
import pickle
import pprint
import copy

import csv
import io


from StructuredData.internal import enumclass
from StructuredData.internal import dictutils
from StructuredData.internal import myyaml
from StructuredData.internal import lock
from StructuredData.internal import Repr

__version__="5.1.1" #VERSION#

assert __version__==enumclass.__version__
assert __version__==dictutils.__version__
assert __version__==myyaml.__version__
assert __version__==lock.__version__
assert __version__==Repr.__version__

use_lockfile= True

# ---------------------------------------------------------
# constants

FILE_LOCK_TIMEOUT= 30

# ---------------------------------------------------------
# list utilities

[docs]def is_iterable(var): """return if var is a *real* iterable, not a string.""" if isinstance(var, str): return False return hasattr(var, "__iter__")
# --------------------------------------------------------- # string utilities
[docs]def andmatch(st, words): """returns True if st contains all the words in any order. The comparison is case insensitive. Here are some examples: >>> st="this is the string that contains some words" >>> andmatch(st,["this"]) True >>> andmatch(st,["that","this"]) True >>> andmatch(st,["that","this","there"]) False >>> andmatch(st,["THIS", "There"]) False >>> andmatch(st,["THIS", "That"]) True >>> andmatch(st,["THIS", "That", "contains"]) True """ st= st.upper() for w in words: w= w.upper() if st.find(w)== -1: return False return True
[docs]def imatch(st, pattern): """returns True if st contains all the words from pattern in any order. The comparison is case insensitive. Here are some examples: >>> st="this is the string that contains some words" >>> imatch(st, "this") True >>> imatch(st, "that this") True >>> imatch(st, "this that there") False >>> imatch(st, "THIS There") False >>> imatch(st, "THIS That") True >>> imatch(st, "THIS That contains") True """ return andmatch(st, pattern.split())
# --------------------------------------------------------- # file utilities def _cache_filename(filename): """return the name of a pickle file. """ return "%s.pic" % filename def _pickle_write(stream, data): """write something pickled. """ pickle.dump(data, stream, pickle.HIGHEST_PROTOCOL) def _find_cache_file(filename): """try to find the cache file. """ if not os.path.exists(filename): raise IOError("file %s doesn't exist" % filename) cache= _cache_filename(filename) if not os.path.exists(cache): return (cache, "missing") if os.path.getmtime(cache)< os.path.getmtime(filename): return (cache, "older") return (cache, "newer") def _write_cache_file(filename, data): """try to write pickle data. """ cache= _cache_filename(filename) try: f= open(cache, "wb") except IOError: # cannot write return False _pickle_write(f, data) f.close() return True def _read_cached(filename, direct_read_func): """read, optionally from a cache file. """ (cache, state)= _find_cache_file(filename) if state!="newer": data= direct_read_func(filename) _write_cache_file(filename, data) else: stream= open(cache, 'rb') data= pickle.load(stream) stream.close() return data # --------------------------------------------------------- # escaping _re_escaped_wc= re.compile(r'\\+(\*|\*\*|#)$')
[docs]def escape_in_stringkey(st): r"""escape special characters with a backslash. replace '.' with '\.' and '[number]' with '\[number\]' This is needed in order to convert path which contains the characters '.' or '[' or ']' to a keylist. (see class StructuredDataStore). Here are some examples: >>> print(escape_in_stringkey("AB")) AB >>> print(escape_in_stringkey("A.B")) A\.B >>> print(escape_in_stringkey("A.B[5]C")) A\.B\[5\]C >>> print(escape_in_stringkey("*")) \* >>> print(escape_in_stringkey("**")) \** >>> print(escape_in_stringkey("#")) \# >>> print(escape_in_stringkey("A#")) A# >>> print(escape_in_stringkey("A*")) A* >>> print(escape_in_stringkey(r"\*")) \\* >>> print(escape_in_stringkey(r"\\*")) \\\* >>> print(escape_in_stringkey(r"\**")) \\** >>> print(escape_in_stringkey(r"\#")) \\# >>> print(escape_in_stringkey(r"\\#")) \\\# """ if st=="*" : # related to class SpecialKey return r"\*" if st=="**": # related to class SpecialKey return r"\**" if st=="#" : # related to class SpecialKey return r"\#" if st.startswith("\\"): # starts already with a backslash if _re_escaped_wc.match(st) is not None: return "\\" + st st= st.replace(".", r"\.") st= st.replace("[", r"\[") st= st.replace("]", r"\]") return st
[docs]def unescape_in_stringkey(st): r"""unescape special characters. replace '\.' with '.' and '\[number\]' with '[number]' This is needed in order to convert keylist to a path which contains the characters '.' or '[' or ']'. (see class StructuredDataStore). Here are some examples: >>> print(unescape_in_stringkey("AB")) AB >>> print(unescape_in_stringkey("A\.B\[2\]C")) A.B[2]C >>> print(unescape_in_stringkey("\*")) * >>> print(unescape_in_stringkey("\**")) ** >>> print(unescape_in_stringkey("\#")) # >>> print(unescape_in_stringkey("\*A")) \*A >>> print(unescape_in_stringkey("\#A")) \#A >>> print(unescape_in_stringkey(r"\\*")) \* >>> print(unescape_in_stringkey(r"\\**")) \** >>> print(unescape_in_stringkey(r"\\#")) \# """ if st== r"\*" : # related to class SpecialKey return "*" if st== r"\**": # related to class SpecialKey return "**" if st== r"\#" : # related to class SpecialKey return "#" if st.startswith("\\"): # starts already with a backslash if _re_escaped_wc.match(st) is not None: return st[1:] st= st.replace(r"\.",".") st= st.replace(r"\[","[") st= st.replace(r"\]","]") return st
# --------------------------------------------------------- # SpecialKey # "root" "#" : root symbol needed for typepaths # "any_key" "*" : matches any key in a path, the key must exist # "any_keys" "**" : matches any number of keys in a path, there must # be at least one key present # activate the following for the pylint checker: ## @@@ #SpecialKeyTypesValue= 1 #class SpecialKeyTypes(): # """dummy class, only here for pylint.""" # # pylint: disable= too-few-public-methods # root=1 # any_key= 1 # any_keys=1 # def __init__(self): # pass # #class SDTypes(): # """dummy class, only here for pylint.""" # # pylint: disable= too-few-public-methods # boolean= 1 # integer= 1 # real= 1 # string= 1 # optional_struct= 1 # open_struct= 1 # struct= 1 # list= 1 # typed_map= 1 # optional_list= 1 # typed_list= 1 # map= 1 # def __init__(self): # pass # #def SDTypesValue(_): # # pylint: disable= missing-docstring # return SDTypes() # ## @@@ enumclass.NewEnum(globals(), "SpecialKeyTypes", [ "root", "any_key", "any_keys" ] )
[docs]class SpecialKey(): """ This class implements a data type for special path elements. Currently there are three kinds implemented, the "any_key" type which matches any key (exactly one key), "any_keys" which matches any key (one or more) and the "root" which is used to represent the root of a StructuredDataStore. Note that in order to make objects of this class usable for being dictionary keys, instances for each type of this class are instantiated, ANYKEY, ANYKEYS and ROOTKEY. You should only use these and not instantiate SpecialKey objects of your own. Use the function string_to_keyobject to create a SpecialKey object from a string, this function uses the predefined objects, too. """ # a number of backslashes followed by a single star def __init__(self, type_): r"""initialize the SpecialKey from a given type and data This creates a SpecialKey object from a given type\_ and an arbitrary data object. The type\_ parameter must be of the type SpecialKeyTypesValue. Currently there are only three possible values for this parameter, SpecialKeyTypes.any_key, SpecialKeyTypes.any_keys and SpecialKeyTypes.root. Here are some examples: >>> w= SpecialKey(SpecialKeyTypes.any_key) >>> repr(w) 'SpecialKey(SpecialKeyTypes.any_key)' >>> str(w) '*' """ if not isinstance(type_, SpecialKeyTypesValue): raise TypeError("type_ must be a SpecialKeyTypesValue") self._type= type_ def __hash__(self): """return hash key.""" return id(self._type)
[docs] def is_root(self): """returns True if the object is of type "root".""" return self._type == SpecialKeyTypes.root
[docs] def simple_wildcard(self): """returns True if the object is of type "any_key".""" return self._type == SpecialKeyTypes.any_key
[docs] def recursive_wildcard(self): """returns True if the object is of type "any_keys".""" return self._type == SpecialKeyTypes.any_keys
[docs] @staticmethod def is_recursive_wildcard(val): """test for recursive wildcard.""" if not isinstance(val, SpecialKey): return False return val.recursive_wildcard()
def __lt__(self, other): """returns True of self < other.""" if not isinstance(other, SpecialKey): return False # pylint: disable= protected-access return self._type < other._type def __le__(self, other): """returns True of self <= other.""" if not isinstance(other, SpecialKey): return False # pylint: disable= protected-access return self._type <=other._type def __eq__(self, other): """returns True of self == other.""" if not isinstance(other, SpecialKey): return False # pylint: disable= protected-access return self._type ==other._type def __ne__(self, other): """returns True of self != other.""" if not isinstance(other, SpecialKey): return True # pylint: disable= protected-access return self._type !=other._type def __gt__(self, other): """returns True of self > other.""" if not isinstance(other, SpecialKey): return True # pylint: disable= protected-access return self._type > other._type def __ge__(self, other): """returns True of self >= other.""" if not isinstance(other, SpecialKey): return True # pylint: disable= protected-access return self._type >=other._type def __repr__(self): """ returns a python expression for the object. """ if self is ROOTKEY: return "ROOTKEY" if self is ANYKEY: return "ANYKEY" if self is ANYKEYS: return "ANYKEYS" return "SpecialKey(%s)" % repr(self._type) def __str__(self): """This returns the object in a human readable form.""" # pylint: disable= no-else-return if self._type== SpecialKeyTypes.root: return "#" elif self._type== SpecialKeyTypes.any_key: return "*" elif self._type== SpecialKeyTypes.any_keys: return "**" else: raise AssertionError("unexpected value: %s" % repr(self._type))
[docs] def match(self, val): """matches the wildcard against an item. This method checks if the SpecialKey matches a given string. It returns True if it does, False otherwise. It raises an exception if the SpecialKey object is not a wildcard. Here are some examples: >>> m= SpecialKey(SpecialKeyTypes.any_key) >>> m.match("ab") True >>> m.match("xy") True >>> m.match([1,2]) False >>> m.match(ANYKEY) True >>> m.match(ANYKEYS) True >>> m.match(ROOTKEY) True """ if self._type== SpecialKeyTypes.any_key: if isinstance(val, int): return True if isinstance(val, str): return True if isinstance(val, SpecialKey): return True return False raise AssertionError("unexpected internal type: %s" % repr(self._type))
[docs] @staticmethod def keysubst(keylist, pattern_keylist): """change a keylist according to a pattern-keylist. Each wildcard in the pattern means that the corresponding key of the keylist remains unchanged. The non-wildcard parts in the pattern replace the corresponding key. If the pattern_keylist is shorter than the keylist, the remaining keys of the keylist are removed. If the pattern_keylist is longer than the keylist the remaining keys of the pattern_keylist are appended. However, these remaining keys MUST NOT be wildcards. Here are some examples: >>> def pathsubst(path, pattern): ... l= SpecialKey.keysubst(StructuredDataStore.split_path(path), ... StructuredDataStore.split_path(pattern)) ... return StructuredDataStore.join_path(l) ... >>> pathsubst("a.b.c","*.*.x") 'a.b.x' >>> pathsubst("a.b.c","*.y.*") 'a.y.c' >>> pathsubst("a.b.c","*.*.*") 'a.b.c' >>> pathsubst("a.b.c","*.*") 'a.b' >>> pathsubst("a.b.c","*.*.*.x") 'a.b.c.x' >>> pathsubst("a.b.c","*.*.*.*") Traceback (most recent call last): ... ValueError: keylist too short for patternlist >>> pathsubst("a.b.c","*[3].*") 'a[3].c' >>> pathsubst("a.b.c","X.**") 'X.b.c' >>> pathsubst("a.b.c","X.**.x.y") 'X.b.c.x.y' >>> pathsubst("a.b.c","X.**.x.*") Traceback (most recent call last): ... ValueError: keylist too short for patternlist """ # pylint: disable=too-many-branches src_l= len(keylist) pat_l= len(pattern_keylist) new= [] src_idx= -1 pat_idx= -1 while True: src_idx+= 1 if src_idx>=src_l: key= None else: key= keylist[src_idx] pat_idx+= 1 if pat_idx>=pat_l: pat= None else: pat= pattern_keylist[pat_idx] if key is not None: if pat is None: break # pattern is finished, stop everything if not isinstance(pat, SpecialKey): new.append(pat) else: if pat.simple_wildcard(): new.append(key) elif pat.recursive_wildcard(): for i in range(src_idx, src_l): new.append(keylist[i]) src_idx= src_l else: raise ValueError("unexpected patternkey '%s'" % \ str(pat)) elif pat is not None: if not isinstance(pat, SpecialKey): new.append(pat) else: raise ValueError("keylist too short for patternlist") else: break return new
[docs] @staticmethod def list_match(pattern_list, keylist): """matches a pattern_list against a keylist. A pattern_list is a list of integers, strings and SpecialKey objects. A keylist is a list of integers and strings. Here are some examples: >>> def test(pattern,path): ... pl= StructuredDataStore.split_path(pattern) ... l = StructuredDataStore.split_path(path) ... return SpecialKey.list_match(pl, l) ... >>> test("a.*","a.x") True >>> test("a.*","v.x") False >>> test("a.*.c","a[3].c") True >>> test("a.*.c","a[3].d") False >>> test("a.*.**","a[3].d") True >>> test("a.*.**","a[3].e") True >>> test("a.*.**","a[3][7]") True >>> test("a.**","a[3][7]") True >>> test("a.*.**","a.b.c") True >>> test("a.*.**","a.b") False >>> test("a.*","a.b.c") False >>> test("a.**","a.b.c") True >>> test("a.b.**","a.b.c") True """ # pylint: disable=too-many-return-statements, too-many-branches l=len(pattern_list) kl= len(keylist) if kl<l: # keylist shorter patternlist return False exact_match= True patternkey= pattern_list[-1] if isinstance(patternkey, SpecialKey): if patternkey.recursive_wildcard(): exact_match= False if exact_match: if kl>l: return False for i in range(l): patternkey= pattern_list[i] if isinstance(patternkey, SpecialKey): # pylint: disable= no-else-continue if patternkey.simple_wildcard(): if not patternkey.match(keylist[i]): return False continue elif SpecialKey.recursive_wildcard(patternkey): if i!=l-1: raise ValueError("'**' pattern only allowed at "+\ "the end of the patternlist") return True else: if patternkey!=keylist[i]: return False if patternkey!=keylist[i]: return False return True
ROOTKEY = SpecialKey(SpecialKeyTypes.root) root_key_str = str(ROOTKEY) ANYKEY = SpecialKey(SpecialKeyTypes.any_key) ANYKEYS = SpecialKey(SpecialKeyTypes.any_keys)
[docs]def string_to_keyobject(val): """convert a wildcard to be used by the MatchPaths object. uses ROOTKEY, ANYKEY and ANYKEYS. """ if val=="#": return ROOTKEY if val=="*": return ANYKEY if val=="**": return ANYKEYS if isinstance(val, str): return unescape_in_stringkey(val) return val
# --------------------------------------------------------- # type checking enumclass.NewEnum(globals(), "SDTypes", [ "boolean", "integer", "real", "string", "optional_struct", "open_struct", "struct", "typed_map", "map", "optional_list", "typed_list", "list", ])
[docs]def check_type_scalar(var, spec, dry_run=False): """implement a simple typecheck. Three basic scalar types are implemented here: SDTypes.boolean: a bool SDTypes.integer: an int SDTypes.real : a float SDTypes.string : a str Here are some examples: >>> check_type_scalar(1,SDTypes.integer) >>> check_type_scalar(1.2,SDTypes.integer) Traceback (most recent call last): ... TypeError: integer expected, got: 1.2 >>> check_type_scalar(1.2,SDTypes.real) >>> check_type_scalar(1,SDTypes.real) >>> check_type_scalar("A",SDTypes.real) Traceback (most recent call last): ... TypeError: real number expected, got: 'A' >>> check_type_scalar("A",SDTypes.string) >>> check_type_scalar(1,SDTypes.string) Traceback (most recent call last): ... TypeError: string expected, got: 1 >>> check_type_scalar(True,SDTypes.boolean) >>> check_type_scalar(False,SDTypes.boolean) >>> check_type_scalar("True",SDTypes.boolean) Traceback (most recent call last): ... TypeError: boolean expected, got: 'True' >>> check_type_scalar(1,SDTypes.boolean) Traceback (most recent call last): ... TypeError: boolean expected, got: 1 """ # pylint: disable=too-many-return-statements, too-many-branches if not isinstance(spec, SDTypesValue): raise TypeError("spec must be of type SDTypesValue") if spec==SDTypes.boolean: if dry_run: return if not isinstance(var, bool): raise TypeError("boolean expected, got: %s" % repr(var)) return if spec==SDTypes.integer: if dry_run: return if not isinstance(var, int): raise TypeError("integer expected, got: %s" % repr(var)) return if spec==SDTypes.real: if dry_run: return if (not isinstance(var, float)) and (not isinstance(var, int)): raise TypeError("real number expected, got: %s" % repr(var)) return if spec==SDTypes.string: if dry_run: return if not isinstance(var, str): raise TypeError("string expected, got: %s" % repr(var)) return raise AssertionError("unexpected typespec: \"%s\"" % spec)
[docs]def check_type(var, spec, dry_run=False): """implement more complex typechecks. These are the known types: SDTypes.boolean a bool SDTypes.integer an int SDTypes.real a float SDTypes.string a str SDTypes.optional_struct [<list of items>] This is a dict where each key must be present in the <list of items>. SDTypes.open_struct [<list of items>] This is a dict where each item of <list of items> must be present as a key. Aside from this, the dict may contain an arbitrary set of additional of keys. SDTypes.struct [<list of items>] This is a dict where each item of <list of items> must be present as a key. No other keys are allowed than the ones listed in <list of items>. SDTypes.typed_map <scalar type> This is a dict where each key must be of the type <scalar type>. <scalar type> may be SDTypes.integer, "float" or SDTypes.string. SDTypes.map This is simply a dict. SDTypes.optional_list [<list of items>] This is a list where each item must be present in <list of items>. SDTypes.typed_list <scalar type> This is a list where each item must be of the type <scalar type>. <scalar type> may be SDTypes.integer, "float" or SDTypes.string. SDTypes.list This is simply a list. Here are some examples: >>> check_type(True,SDTypes.boolean) >>> check_type(False,SDTypes.boolean) >>> check_type("A",SDTypes.boolean) Traceback (most recent call last): ... TypeError: boolean expected, got: 'A' >>> check_type(1,SDTypes.integer) >>> check_type(1.2,SDTypes.integer) Traceback (most recent call last): ... TypeError: integer expected, got: 1.2 >>> check_type(1.2,SDTypes.real) >>> check_type(1,SDTypes.real) >>> check_type("A",SDTypes.real) Traceback (most recent call last): ... TypeError: real number expected, got: 'A' >>> check_type("A",SDTypes.string) >>> check_type(1,SDTypes.string) Traceback (most recent call last): ... TypeError: string expected, got: 1 Tests with the type SDTypes.optional_struct: >>> check_type({"a":1,"c":1},{SDTypes.optional_struct:["a","b","c"]}) >>> check_type({"a":1,"d":1},{SDTypes.optional_struct:["a","b","c"]}) Traceback (most recent call last): ... TypeError: key d not in list of allowed keys >>> check_type(["a","b"],{SDTypes.optional_struct:["a","b","c"]}) Traceback (most recent call last): ... TypeError: structure expected Tests with the type SDTypes.open_struct: >>> check_type({"a":1,"b":1,"c":1},{SDTypes.open_struct:["a","b","c"]}) >>> check_type({"a":1,"b":1,"c":1,"d":1},{SDTypes.open_struct:["a","b","c"]}) >>> check_type({"a":1,"b":1,"d":1},{SDTypes.open_struct:["a","b","c"]}) Traceback (most recent call last): ... TypeError: key c is missing Tests with the type SDTypes.struct: >>> check_type({"a":1,"b":1,"c":1},{SDTypes.struct:["a","b","c"]}) >>> check_type({"a":1,"b":1},{SDTypes.struct:["a","b","c"]}) Traceback (most recent call last): ... TypeError: mandatory key c is missing >>> check_type({"a":1,"b":1,"c":1,"d":1},{SDTypes.struct:["a","b","c"]}) Traceback (most recent call last): ... TypeError: key d is not in list of allowed keys Tests with the type SDTypes.typed_map: >>> check_type({"a":1,"b":1},{SDTypes.typed_map:SDTypes.string}) >>> check_type({2:1,"b":1},{SDTypes.typed_map:SDTypes.string}) Traceback (most recent call last): ... TypeError: string expected, got: 2 Tests with the type SDTypes.map: >>> check_type({"a":1,"c":1},SDTypes.map) >>> check_type([1,2],SDTypes.map) Traceback (most recent call last): ... TypeError: map expected >>> check_type(1,SDTypes.map) Traceback (most recent call last): ... TypeError: map expected Tests with the type SDTypes.optional_list: >>> check_type(["a","c"],{SDTypes.optional_list:["a","b","c"]}) >>> check_type(["a","d"],{SDTypes.optional_list:["a","b","c"]}) Traceback (most recent call last): ... TypeError: item d not in list of allowed keys >>> check_type({"a":1,"b":2},{SDTypes.optional_list:["a","b","c"]}) Traceback (most recent call last): ... TypeError: list expected Tests with the type SDTypes.typed_list: >>> check_type(["a","b"],{SDTypes.typed_list:SDTypes.string}) >>> check_type(["a","b",1],{SDTypes.typed_list:SDTypes.string}) Traceback (most recent call last): ... TypeError: string expected, got: 1 Tests with the type SDTypes.list: >>> check_type([1,2],SDTypes.list) >>> check_type({"a":1,"c":1},SDTypes.list) Traceback (most recent call last): ... TypeError: list expected >>> check_type(1,SDTypes.list) Traceback (most recent call last): ... TypeError: list expected """ # pylint: disable=too-many-return-statements, too-many-branches # pylint: disable=too-many-statements # pylint: disable= no-else-return if isinstance(spec, dict): if len(spec)!=1: raise AssertionError("spec dictionary must have exactly one key") (spec_key,spec_val)= list(spec.items())[0] if not isinstance(spec_key, SDTypesValue): raise TypeError("spec_key must be of type SDTypesValue") if spec_key==SDTypes.optional_struct: if not hasattr(spec_val, "__contains__"): raise AssertionError("spec value must be a list or set") if dry_run: return if not isinstance(var, dict): raise TypeError("structure expected") # each key must be in list of allowed keys for key in var.keys(): if not key in spec_val: raise TypeError("key %s not in list of allowed keys" % key) return elif spec_key==SDTypes.open_struct: if not hasattr(spec_val, "__contains__"): raise AssertionError("spec value must be a list or set") if dry_run: return if not isinstance(var, dict): raise TypeError("structure expected") # each of the allowed keys must be present keys= set(var.keys()) for a in spec_val: if a not in keys: raise TypeError("key %s is missing" % a) return elif spec_key==SDTypes.struct: if not hasattr(spec_val, "__contains__"): raise AssertionError("spec value must be a list or set") if dry_run: return if not isinstance(var, dict): raise TypeError("structure expected") # exactly all of the allowed keys must be present keys= set(var.keys()) if isinstance(spec_val, set): allowed= spec_val else: allowed= set(spec_val) all_= keys.union(allowed) for k in all_: if k not in keys: raise TypeError("mandatory key %s is missing" % k) if k not in allowed: raise TypeError("key %s is not in list of allowed keys" % k) return elif spec_key==SDTypes.typed_map: check_type_scalar(None, spec_val, True) if dry_run: return if not isinstance(var, dict): raise TypeError("map expected") for k in var.keys(): check_type_scalar(k, spec_val) return elif spec_key==SDTypes.optional_list: if not hasattr(spec_val, "__contains__"): raise AssertionError("spec value must be a list or set") if dry_run: return if not isinstance(var, list): raise TypeError("list expected") # each item must be in list of allowed keys allowed= spec_val for item in var: if not item in allowed: raise TypeError("item %s not in list of allowed keys" % item) return elif spec_key==SDTypes.typed_list: check_type_scalar(None, spec_val, True) if dry_run: return if not isinstance(var, list): raise TypeError("list expected") for k in var: check_type_scalar(k, spec_val) return else: raise AssertionError("unknown type spec: %s" % k) elif isinstance(spec, SDTypesValue): if spec==SDTypes.map: if dry_run: return if not isinstance(var, dict): raise TypeError("map expected") return elif spec==SDTypes.list: if dry_run: return if not isinstance(var, list): raise TypeError("list expected") return else: check_type_scalar(var, spec, dry_run) return else: raise AssertionError("unknown type spec: \"%s\"" % repr(spec))
[docs]class SingleTypeSpec(): """a single type specification. Optimized for quicker typechecks. Here are some examples: >>> s= SingleTypeSpec("integer") >>> repr(s) "SingleTypeSpec('integer')" >>> str(s) "SingleTypeSpec('integer')" >>> s.check(1) >>> s.check(1.2) Traceback (most recent call last): ... TypeError: integer expected, got: 1.2 >>> s=SingleTypeSpec({"optional_struct":["x","y","z"]}) >>> repr(s) "SingleTypeSpec({'optional_struct': ['x', 'y', 'z']})" >>> str(s) "SingleTypeSpec({'optional_struct': ['x', 'y', 'z']})" >>> s.check({"x":1,"z":2}) >>> s.check({"x":1,"a":2}) Traceback (most recent call last): ... TypeError: key a not in list of allowed keys """ def __init__(self, dict_): self._val= SingleTypeSpec._from_SD(dict_) # self._val is either a SDTypesValue or a # dict mapping a SDTypesValue to an SDTypesValue or a set
[docs] def check(self, var, dry_run= False): """check a type.""" return check_type(var, self._val, dry_run=dry_run)
def __repr__(self): d= self.to_dict() return "SingleTypeSpec(%s)" % repr(d) def __str__(self): d= self.to_dict() lines= ["SingleTypeSpec("] l= pprint.pformat(d, indent=2, width=70).split("\n") if len(l)==1: return "SingleTypeSpec(%s)" % l[0] l[-1]+="\n)" lines.extend(l) return "\n ".join(lines)
[docs] def items(self): """returns the internal set of items if there is one. Since this method returns the internal set object, it can be used to manipulate the set directly. Here are some examples: >>> s= SingleTypeSpec("integer") >>> s.items() Traceback (most recent call last): ... TypeError: the typespec has no items >>> s=SingleTypeSpec({"optional_struct":["x","y","z"]}) >>> print(Repr.repr(s.items())) {'x', 'y', 'z'} >>> s.items().add("w") >>> print(s) SingleTypeSpec({'optional_struct': ['w', 'x', 'y', 'z']}) >>> s.items().remove("x") >>> print(s) SingleTypeSpec({'optional_struct': ['w', 'y', 'z']}) """ if not isinstance(self._val, dict): raise TypeError("the typespec has no items") k= list(self._val.keys())[0] v= self._val[k] if not isinstance(v, set): raise TypeError("the typespec has no items") return v
@staticmethod def _from_SD(val): """helper to create a SingleTypeSpec from a dict or str. If val is a str (string) this function returns a SDTypesValue created from that string. If val is a dictionary this function expects that this dictionary has exactly one key. It returns a dictionary with one key which is a SDTypesValue created from the single dictionary key that is mapped to an SDTypesValue or a set. """ if isinstance(val, dict): if len(val)!=1: raise AssertionError("spec dictionary must have exactly one key") sk= list(val.keys())[0] k= SDTypesValue(sk) v= val[sk] if isinstance(v, list): v= set(v) elif isinstance(v, str): v= SDTypesValue(v) else: raise AssertionError("unexpected val %s in spec %s" % (repr(v), repr(val))) return { k: v } if isinstance(val, str): try: return SDTypesValue(val) except ValueError: raise ValueError("unknown typespec: \"%s\"" % val) raise AssertionError("unexpected spec: %s" % repr(val))
[docs] def to_dict(self): """convert the SingleTypeSpec object to a dictionary.""" if isinstance(self._val, dict): if len(self._val)!=1: raise AssertionError("self._val dictionary must have exactly one key") k= list(self._val.keys())[0] if not isinstance(k, SDTypesValue): raise TypeError("key must be of type SDTypesValue") v= self._val[k] k= str(k) if isinstance(v, set): v= sorted(v) elif isinstance(v, SDTypesValue): v= str(v) else: raise AssertionError("unexpected val %s in self._val %s" % \ (repr(v), repr(self._val))) return { k: v } if isinstance(self._val, SDTypesValue): return str(self._val) raise AssertionError("unexpected self._val: %s" % repr(self._val))
# --------------------------------------------------------- # notes: # keylist: # a list of strings/ints
[docs]class StructuredDataStore(): """defines a universal data store. This class is used to represent the StructuredDataStore. A StructuredDataStore contains the pure data as a python dictionary which may contain simple values or references to further dictionaries or lists. It also may contain a *link dictionary*. This dictionary maps python object ids (every python object has an id) to sets of keylists. When the StructuredDataStore is modified by using it's methods it tries to keep the link dictionary up to date (if it exists), but it cannot cover all cases. Especially you should use the *link* method when you plan to refer to the same data at several places of the StructuredDataStore. """ # pylint: disable=too-many-public-methods _re_br= re.compile(r'(?<!\\)(\[)') _re_split= re.compile(r'(?<!\\)\.') _re_no= re.compile(r'\[(\d+)\]') # ----------------------------------------------------- # creation / initialization
[docs] @staticmethod def from_flat_dict(data=None): """create StructuredDataStore from a flat dictionary. This initializes the object from a flat dictionary structure. A flat dictionary is a dictionary that maps paths to values. These paths and the values are used to generate a complete StructuredDataStore structure. Here is an example: >>> p= StructuredDataStore.from_flat_dict({"A.B.C":1, "A.B.D":2}) >>> print(p) StructuredDataStore({'A': {'B': {'C': 1, 'D': 2}}}) """ if data is None: data= {} new= StructuredDataStore() new.setitems(iter(data.items()), create_missing= True) return new
[docs] def clone(self, deepcopy= True): """clones a StructuredDataStore object. This creates a copy of the StructuredDataStore. If the parameter deepcopy is True, a *deep* copy is performed where the new StructuredDataStore is completely independent from the first StructuredDataStore meaning that changes on the first one do never affect the new one. Here are some examples: >>> d= {"A": {"B": 1, "C": 2}, ... "x": {"y":None}, ... "l": [["val"], 1, None] ... } >>> d["x"]["y"]= d["A"] >>> d["l"][2]= d["l"][0] >>> p= StructuredDataStore(d) >>> print(p) StructuredDataStore( { 'A': {'B': 1, 'C': 2}, 'l': [['val'], 1, ['val']], 'x': {'y': {'B': 1, 'C': 2}}} ) >>> shallow= p.clone(False) >>> deep = p.clone(True) >>> p["A"]["B"]="CHANGED!" >>> print(p) StructuredDataStore( { 'A': {'B': 'CHANGED!', 'C': 2}, 'l': [['val'], 1, ['val']], 'x': {'y': {'B': 'CHANGED!', 'C': 2}}} ) >>> print(shallow) StructuredDataStore( { 'A': {'B': 'CHANGED!', 'C': 2}, 'l': [['val'], 1, ['val']], 'x': {'y': {'B': 'CHANGED!', 'C': 2}}} ) >>> print(deep) StructuredDataStore( { 'A': {'B': 1, 'C': 2}, 'l': [['val'], 1, ['val']], 'x': {'y': {'B': 1, 'C': 2}}} ) """ new= StructuredDataStore() # pylint: disable= protected-access if not deepcopy: new._data= copy.copy(self._data) else: new._data= copy.deepcopy(self._data) return new
def __init__(self, data=None): """initialize the object. This initializes the object. A python dictionary can be given as an optional parameter. In this case, the internal dictionary becomes a reference to the given dictionary. """ if data is None: data= {} self._data= data self._link_dict= None self._locked= False # ----------------------------------------------------- # exporting
[docs] def as_dict(self): """return the as_dict data. This method returns the StructuredDataStore as a python dictionary. Note that this does not copy the data, it just returns the internal dictionary. Here is a simple example: >>> d={"key1":1, "key2": {"A":"x","B":"y"}, "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> Repr.printrepr(p.as_dict()) {'key1': 1, 'key2': {'A': 'x', 'B': 'y'}, 'key3': [1, 2, 3, {'float': 1.23}]} """ if self._locked: raise ValueError("returning StructuredDataStore as a dict failed, "+\ "object is locked") return self._data
[docs] def as_flat_dict(self, pattern="", show_links= False, path_list= None, ): """return the data in the "flat dict" form. This method returns the StructuredDataStore as a flat python dictionary. This is a dictionary that maps paths to values. Here are some examples: >>> d={"key1":1, "key2": {"A":"x","B":"y"}, "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> Repr.printnice(p.as_flat_dict(), 0) { 'key1': 1, 'key2.A': 'x', 'key2.B': 'y', 'key3[0]': 1, 'key3[1]': 2, 'key3[2]': 3, 'key3[3].float': 1.23 } >>> Repr.printrepr(p.as_flat_dict(pattern="key3.**")) {'key3[0]': 1, 'key3[1]': 2, 'key3[2]': 3, 'key3[3].float': 1.23} >>> Repr.printrepr(p.as_flat_dict(pattern="key3[3].**")) {'key3[3].float': 1.23} >>> Repr.printrepr(p.as_flat_dict(pattern="key3[3].float")) {'key3[3].float': 1.23} >>> Repr.printrepr(p.as_flat_dict(pattern=["key2.**","key3.*"])) {'key2.A': 'x', 'key2.B': 'y', 'key3[0]': 1, 'key3[1]': 2, 'key3[2]': 3} """ # if show_links: # join= lambda l: self.annotated_join_path(l) # else: # join= StructuredDataStore.join_path d={} for (path,_,val) in self.universal_iter(patterns=pattern, path_list= path_list, only_leaves= True, sorted_= False, show_links= show_links): # print("PATH:",path,"VAL:",val) d[path]= val return d
[docs] def as_csv(self, stream= None, delimiter=',', align_values= False, value_first= False): """return the data in csv form. This method returns the StructuredDataStore as csv. If the parameter "stream" is supplied it writes to that stream, otherwise it returns a string. parameters: stream the stream to print(to. If this is None, return a string with the) data. align_values the value is in the last column, all values are aligned meaning they are all at the same column number. If thus parameter is False, the values are in the last column of each line but they may be at different column numbers in the different lines. value_first put the value in the first column, *before* all keys. Here is a simple example: >>> d={"key1":1, "key2": {"A":"x","B":"y"}, "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> print(p.as_csv()) key1,1 key2,A,x key2,B,y key3,0,1 key3,1,2 key3,2,3 key3,3,float,1.23 <BLANKLINE> >>> print(p.as_csv(value_first= True)) 1,key1 x,key2,A y,key2,B 1,key3,0 2,key3,1 3,key3,2 1.23,key3,3,float <BLANKLINE> >>> print(p.as_csv(align_values=True)) key1,,,1 key2,A,,x key2,B,,y key3,0,,1 key3,1,,2 key3,2,,3 key3,3,float,1.23 <BLANKLINE> """ # pylint: disable=too-many-locals if stream is not None: result= stream else: result= io.StringIO() csvwriter= csv.writer(result, delimiter= delimiter, lineterminator= os.linesep) if align_values: pair_list= [] max_keylen= 0 for (_,keys,val) in self.direct_iter(pattern="", only_leaves= True, sorted_= True, show_links= False): if align_values: pair_list.append((keys[:], val)) # ^^^ necessary since direct_iter returns *the same* # list object at each iteration max_keylen= max(max_keylen, len(keys)) else: if value_first: csvwriter.writerow([val]+keys) else: csvwriter.writerow(keys+[val]) if align_values: for keylist, v in pair_list: if len(keylist)<max_keylen: keylist.extend([""]*(max_keylen-len(keylist))) keylist.append(v) csvwriter.writerow(keylist) if stream is None: contents= result.getvalue() result.close() return contents stream.close() return None
# ----------------------------------------------------- # other global functions
[docs] def lock(self): """make the StructuredDataStore read-only.""" self._locked= True
[docs] def clear(self): """delete all data. This removes all data from the object. """ if self._locked: raise ValueError("modification of StructuredDataStore failed, "+\ "object is locked") self._data= {}
# ----------------------------------------------------- # adding data from python structures
[docs] def add(self, data): """adds new data to the StructuredDataStore object. This adds new data from a given dictionary representation of a StructuredDataStore. Note that different from the update method of dict, dictionaries in the structure that already exist are not replaced with the ones given in the new structure but new keys are added. Data must be a dict or a list of pairs. """ if self._locked: raise ValueError("modification of StructuredDataStore failed, "+\ "object is locked") dictutils.struct_add(self._data, data)
[docs] def update(self, data): """updates the StructuredDataStore from the given dictionary. This updates the StructuredData object from a given dictionary. Note that existing dictionaries are replaced by the ones in the given data. You usually want to use the "add" method instead of this one. """ if self._locked: raise ValueError("modification of StructuredDataStore failed, "+\ "object is locked") self._data.update(data)
# ----------------------------------------------------- # querying/adding/modifying by keylists
[docs] def keylist_set(self, keylist, new_value= None, always_dicts= False): """set the object that is identified by a keylist. This method sets a value in the StructuredDataStore that is referred by the given keylist. When the parameter create_missing is True, this function can also create data structures on the fly when the given keys do not yet exist at some place in the structure. The last data item created is given by the parameter new_value, which defaults to the value "None". Note that updates the internal link dictionary correctly only if the value given is not yet present in the StructuredDataStore. If the value *is* already present in the StructuredDataStore you should use the method link instead. Here are some examples: >>> p=StructuredDataStore({}) >>> p.keylist_set(["ab"]) >>> p StructuredDataStore({'ab': None}) >>> p.keylist_set(["ab","x","y",1,"a"]) >>> p StructuredDataStore({'ab': {'x': {'y': [None, {'a': None}]}}}) >>> p.keylist_set(["ab","x","z"],"myvalue") 'myvalue' >>> p StructuredDataStore({'ab': {'x': {'y': [None, {'a': None}], 'z': 'myvalue'}}}) """ if self._locked: raise ValueError("modification of StructuredDataStore failed, "+\ "object is locked") if self._link_dict is not None: self._links_remove(keylist) dictutils.struct_keylist_add(self._data, keylist, new_value, always_dicts) return new_value
[docs] def keylist2object(self, keylist): """get the object that is identified by a keylist. This method returns for a given keylist the part of the StructuredDataStore it adresses. Here are some examples: >>> d={"key1":1, "key2": {"A":"x","B":"y"}, "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> p.keylist2object(["key2"]) {'A': 'x', 'B': 'y'} >>> p.keylist2object(["key2","B"]) 'y' >>> p.keylist2object(["key3",1]) 2 >>> p.keylist2object(["key3",3]) {'float': 1.23} >>> p.keylist2object(["key3",3,"float"]) 1.23 >>> p.keylist2object(["key3",3,"floatx"]) Traceback (most recent call last): ... KeyError: "['key3', 3, 'floatx'] not found (subkey: 'floatx')" """ return dictutils.struct_keylist_get(self._data, keylist)
[docs] def listsetitem(self, keylist, val, create_missing= False, always_dicts= False): """similar to setitem, but gets a keylist. This method sets for a given path the part of the StructuredDataStore it addresses. If the parameter create_missing is True, this function can also create data structures on the fly when the given keys do not yet exist at some place in the structure. If the parameter always_dicts is True, the created structures are always dicts and never lists even if some of the elements of the path are integers. """ if self._locked: raise ValueError("modification of StructuredDataStore failed, "+\ "object is locked") if create_missing: self.keylist_set(keylist, val, always_dicts= always_dicts) else: dictutils.struct_keylist_change(self._data, keylist, val) if self._link_dict is not None: self._links_remove(keylist)
[docs] def listdelitem(self, keylist): """deletes an item from the store. This is similar to delitem but gets a keylist instead of a path. Here is an example: >>> d={"key1":1, "key2": {"A":"x","B":"y"}, "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> print(p) StructuredDataStore( { 'key1': 1, 'key2': {'A': 'x', 'B': 'y'}, 'key3': [1, 2, 3, {'float': 1.23}]} ) >>> p.listdelitem(["key3",3,"float"]) >>> print(p) StructuredDataStore( {'key1': 1, 'key2': {'A': 'x', 'B': 'y'}, 'key3': [1, 2, 3, {}]} ) >>> p.listdelitem(["key3",3]) >>> print(p) StructuredDataStore( {'key1': 1, 'key2': {'A': 'x', 'B': 'y'}, 'key3': [1, 2, 3]} ) """ if self._locked: raise ValueError("modification of StructuredDataStore failed, "+\ "object is locked") last= keylist[-1] new_keylist= keylist[0:-1] del self.keylist2object(new_keylist)[last] self._links_remove(keylist)
# ----------------------------------------------------- # querying/adding/modifying by paths
[docs] def path2object(self, path): """get the object a path points to. This method returns for a given path the part of the StructuredDataStore it adresses. Here are some examples: >>> d={"key1":1, "key2": {"A":"x","B":"y"}, "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> p.path2object("key2") {'A': 'x', 'B': 'y'} >>> p.path2object("key2.A") 'x' >>> p.path2object("key3[1]") 2 >>> p.path2object("key3[3]") {'float': 1.23} >>> p.path2object("key3[3].float") 1.23 >>> p.path2object("key3[3].floatx") Traceback (most recent call last): ... KeyError: "['key3', 3, 'floatx'] not found (subkey: 'floatx')" """ return self.keylist2object(StructuredDataStore.split_path(path))
def __getitem__(self, path): """return the object the path points to. This method returns for a given path the part of the StructuredDataStore it addresses. This means that you may read elements in a StructuredDataStore object by addressing it like a dictionary like "object[path]". Here are some examples: >>> d={"key1":1, "key2": {"A":"x","B":"y"}, "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> p["key2"] {'A': 'x', 'B': 'y'} >>> p["key2.A"] 'x' >>> p["key3[1]"] 2 >>> p["key3[3]"] {'float': 1.23} >>> p["key3[3].float"] 1.23 >>> p["key3[3].floatx"] Traceback (most recent call last): ... KeyError: "['key3', 3, 'floatx'] not found (subkey: 'floatx')" """ return self.path2object(path) def __setitem__(self, path, val): """sets an item in the StructuredDataStore. This method sets for a given path the part of the StructuredDataStore it addresses. This means that you may assign elements in a StructuredDataStore object by addressing it like a dictionary like "object[path]=value". """ return self.setitem(path, val, create_missing= False)
[docs] def setitem(self, path, val, create_missing= False, always_dicts= False): """set the object the path points to. This method sets for a given path the part of the StructuredDataStore it addresses. If the parameter create_missing is True, this function can also create data structures on the fly when the given keys do not yet exist at some place in the structure. If the parameter always_dicts is True, the created structures are always dicts and never lists even if some of the elements of the path are integers. Here are some examples: >>> d={"key1":1, "key2": {"A":"x","B":"y"}, "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> print(p) StructuredDataStore( { 'key1': 1, 'key2': {'A': 'x', 'B': 'y'}, 'key3': [1, 2, 3, {'float': 1.23}]} ) >>> p["key1"]= 10 >>> print(p) StructuredDataStore( { 'key1': 10, 'key2': {'A': 'x', 'B': 'y'}, 'key3': [1, 2, 3, {'float': 1.23}]} ) >>> p["key2.A"]= "xx" >>> print(p) StructuredDataStore( { 'key1': 10, 'key2': {'A': 'xx', 'B': 'y'}, 'key3': [1, 2, 3, {'float': 1.23}]} ) >>> p["key3[3].float"]= 2.46 >>> print(p) StructuredDataStore( { 'key1': 10, 'key2': {'A': 'xx', 'B': 'y'}, 'key3': [1, 2, 3, {'float': 2.46}]} ) >>> p["key3[3].floatx"]= 3.46 Traceback (most recent call last): ... KeyError: "['key3', 3, 'floatx'] not found (subkey: 'floatx')" >>> p.setitem("key3[3].floatx",3.46,create_missing=True) >>> print(p) StructuredDataStore( { 'key1': 10, 'key2': {'A': 'xx', 'B': 'y'}, 'key3': [1, 2, 3, {'float': 2.46, 'floatx': 3.46}]} ) """ if self._locked: raise ValueError("modification of StructuredDataStore failed, "+\ "object is locked") keylist= StructuredDataStore.split_path(path) self.listsetitem(keylist, val, create_missing=create_missing, always_dicts= always_dicts)
[docs] def setitems(self, pairs, create_missing= False, always_dicts= False): """add new items to the object from a list of pairs. This method sets for a list of path-value pairs part of the StructuredDataStore they address. If the parameter create_missing is True, this function can also create data structures on the fly when the given keys do not yet exist at some place in the structure. """ if self._locked: raise ValueError("modification of StructuredDataStore failed, "+\ "object is locked") for (k,v) in pairs: self.setitem(k, v, create_missing, always_dicts= always_dicts)
def __delitem__(self, path): """deletes an item from the store. This method deletes an item from the StructuredDataStore. This means you can use the statement "del store[path]" to delete an item from the store. Here is an example: >>> d={"key1":1, "key2": {"A":"x","B":"y"}, "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> print(p) StructuredDataStore( { 'key1': 1, 'key2': {'A': 'x', 'B': 'y'}, 'key3': [1, 2, 3, {'float': 1.23}]} ) >>> del p["key3[3].float"] >>> print(p) StructuredDataStore( {'key1': 1, 'key2': {'A': 'x', 'B': 'y'}, 'key3': [1, 2, 3, {}]} ) >>> del p["key3[3]"] >>> print(p) StructuredDataStore( {'key1': 1, 'key2': {'A': 'x', 'B': 'y'}, 'key3': [1, 2, 3]} ) """ return self.delitem(path)
[docs] def delitem(self, path): """deletes an item from the store. This is the explicit deletion method, it's function is the same as __delitem__. Here is an example: >>> d={"key1":1, "key2": {"A":"x","B":"y"}, "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> print(p) StructuredDataStore( { 'key1': 1, 'key2': {'A': 'x', 'B': 'y'}, 'key3': [1, 2, 3, {'float': 1.23}]} ) >>> p.delitem("key3[3].float") >>> print(p) StructuredDataStore( {'key1': 1, 'key2': {'A': 'x', 'B': 'y'}, 'key3': [1, 2, 3, {}]} ) >>> p.delitem("key3[3]") >>> print(p) StructuredDataStore( {'key1': 1, 'key2': {'A': 'x', 'B': 'y'}, 'key3': [1, 2, 3]} ) """ if self._locked: raise ValueError("modification of StructuredDataStore failed, "+\ "object is locked") keylist= StructuredDataStore.split_path(path) return self.listdelitem(keylist)
# ----------------------------------------------------- # link handling def _links_remove(self, keylist): """remove a keylist tuple from the _link_dict. """ if self._link_dict is None: return val= None try: val= self.keylist2object(keylist) except KeyError: val= None except IndexError: val= None if val is None: return val_id= id(val) keylist_set= self._link_dict.get(val_id) if keylist_set is None: return if len(keylist_set)<=2: del self._link_dict[val_id] return keylist_set.discard(tuple(keylist)) def _links_add(self, from_keylist, to_keylist, val): """add a keylist tuple to the _link_dict. """ if self._link_dict is None: return id_val= id(val) keylist_set= self._link_dict.get(id_val) if keylist_set is None: self._link_dict[id_val]= set((tuple(from_keylist), tuple(to_keylist))) else: keylist_set.add(tuple(from_keylist)) def _get_links_by_id(self, id_): """get the links from a given object-id. """ if self._link_dict is None: self.refresh_links() return self._link_dict.get(id_) # returns None or a keylist-set
[docs] def is_linked(self,path): """returns True if another path points to the same data. This function returns True if there is at least one other path that refers to the same data as the given path. Here are some examples: >>> d= {"A": {"B": 1, "C": 2}, ... "x": {"y":None}, ... "l": [["val"], 1, None] ... } >>> d["x"]["y"]= d["A"] >>> d["l"][2]= d["l"][0] >>> s= StructuredDataStore(d) >>> s.is_linked("A") True >>> s.is_linked("A.B") False >>> s.is_linked("l[2]") True """ l= self.get_links(path) return len(l)>1
# ----------------------------------------------------- # iteration across the structure
[docs] def selection_iter(self, pattern, path_list, only_leaves, sorted_, show_links, paths_found_set= None): """iteration through a path list. Here are some examples: >>> d={"key1":1, "key2": {"A":"x","B":"y"}, "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> def test(s,pattern,path_list,only_leaves,sorted_, ... show_links=False, paths_found_set= None): ... for l in s.selection_iter(pattern,path_list,only_leaves, ... sorted_,show_links,paths_found_set): ... print(repr(l)) >>> test(p,"",['key3[0]','key3[3]'],False,False) ('key3[0]', ['key3', 0], 1) ('key3[3]', ['key3', 3], {'float': 1.23}) >>> test(p,"",[('key3[0]',1),('key3[2]',3)],False,False) ('key3[0]', ['key3', 0], 1) ('key3[2]', ['key3', 2], 3) >>> pset= set() >>> test(p,"",['key3[0]','key3[3]','key3[3].float'],True,True,False,pset) ('key3[0]', ['key3', 0], 1) ('key3[3].float', ['key3', 3, 'float'], 1.23) >>> test(p,"",['key3[0]','key3[3]','key3[3].float'],False,True,False,pset) ('key3[3]', ['key3', 3], {'float': 1.23}) >>> d={"key1":{"A":"x1","B":"y1"}, ... "key2":{"A":"x2","B":"y2"}, ... "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> test(p,"key1",['key1','key1.A','key2.A'],False,True) ('key1', ['key1'], {'A': 'x1', 'B': 'y1'}) >>> test(p,"key1.**",['key1','key1.A','key2.A'],False,True) ('key1.A', ['key1', 'A'], 'x1') >>> test(p,"*.A",['key1','key1.A','key2.A'],False,True) ('key1.A', ['key1', 'A'], 'x1') ('key2.A', ['key2', 'A'], 'x2') >>> d={"key1":{"A":"x1","B":"y1"}} >>> d["key1"]["B"]= d["key1"] >>> p= StructuredDataStore(d) >>> test(p,"",["key1","key1.A","key1.B"],False,True) ('key1', ['key1'], {'A': 'x1', 'B': {...}}) ('key1.A', ['key1', 'A'], 'x1') ('key1.B', ['key1', 'B'], {'A': 'x1', 'B': {...}}) >>> test(p,"",["key1","key1.A","key1.B"],False,True,True) ('key1*', [('key1', '*')], {'A': 'x1', 'B': {...}}) ('key1*.A', [('key1', '*'), 'A'], 'x1') ('key1*.B*', [('key1', '*'), ('B', '*')], {'A': 'x1', 'B': {...}}) """ # pylint: disable=too-many-locals def link_check(path, keylist, link_dict): """sets the link flags for the keys. parameters: path The path keylist The corresponding keylist link_dict Here we remember if a path was already checked for links """ if not show_links: return (path, keylist) sub_path= "" newkeylist= [] for i, k in enumerate(keylist): sub_path= StructuredDataStore.append_path(sub_path, k) flag= link_dict.get(sub_path) if flag is None: v= self.keylist2object(keylist[:i+1]) s= self._get_links_by_id(id(v)) flag= (s is not None) link_dict[sub_path]= flag if flag: newkeylist.append((k,"*")) else: newkeylist.append(k) continue return (StructuredDataStore.join_path(newkeylist), newkeylist) # ---------- pattern_list= StructuredDataStore.split_path(pattern) if not pattern_list: pattern_list=[ANYKEYS] if sorted_: path_list= sorted(path_list) link_dict= {} for path in path_list: # if elements of path_list are tuples or lists take # the first element: if is_iterable(path): path= path[0] keys= StructuredDataStore.split_path(path) value= self.keylist2object(keys) if only_leaves: if is_iterable(value): continue if not SpecialKey.list_match(pattern_list, keys): continue (npath, nkeys)= link_check(path, keys, link_dict) if paths_found_set is None: yield (npath, nkeys, value) else: if path not in paths_found_set: yield (npath,nkeys,value) paths_found_set.add(path)
[docs] def direct_iter(self, pattern, only_leaves, sorted_, show_links, paths_found_set= None): """Iteration through the StructuredDataStore. This method returns an iterator on all the nodes of the StructuredDataStore. For each iteration it returns a tuple consisting of the path, the keylist and the value of that node. parameters: pattern A path pattern that is used to filter the results. If this is None or an empty string, the results are not filtered. only_leaves If this is true, only nodes that are scalars are returned. sorted If this is true, iteration through map keys is sorted. show_links If this is true, links are marked in the result. paths_found_set If this parameter is given, it should be a set. Paths that already are in this set are not returned. Paths found are added to this set. returns: a tuple (path,keys,value) IMPORTANT NOTE: The keys returned at each iteration are NOT a new list but the same list with a new content at each iteration. If you use this iterator to store the keylist somewhere keep in mind that you have to make a shallow copy of the keylist like in "my_keylist= keylist[:]". Note that list comprehension WILL NOT work due to this reason. Here are some examples: >>> d={"key1":1, "key2": {"A":"x","B":"y"}, "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> def test(s,pattern,only_leaves,sorted_, ... show_links=False, paths_found_set= None): ... for l in s.direct_iter(pattern,only_leaves, ... sorted_,show_links,paths_found_set): ... print(repr(l)) >>> test(p,"",False,True) ('key1', ['key1'], 1) ('key2', ['key2'], {'A': 'x', 'B': 'y'}) ('key2.A', ['key2', 'A'], 'x') ('key2.B', ['key2', 'B'], 'y') ('key3', ['key3'], [1, 2, 3, {'float': 1.23}]) ('key3[0]', ['key3', 0], 1) ('key3[1]', ['key3', 1], 2) ('key3[2]', ['key3', 2], 3) ('key3[3]', ['key3', 3], {'float': 1.23}) ('key3[3].float', ['key3', 3, 'float'], 1.23) >>> pset= set() >>> test(p,"",True,True,False,pset) ('key1', ['key1'], 1) ('key2.A', ['key2', 'A'], 'x') ('key2.B', ['key2', 'B'], 'y') ('key3[0]', ['key3', 0], 1) ('key3[1]', ['key3', 1], 2) ('key3[2]', ['key3', 2], 3) ('key3[3].float', ['key3', 3, 'float'], 1.23) >>> test(p,"",False,True,False,pset) ('key2', ['key2'], {'A': 'x', 'B': 'y'}) ('key3', ['key3'], [1, 2, 3, {'float': 1.23}]) ('key3[3]', ['key3', 3], {'float': 1.23}) >>> d={"key0":1, ... "key1":{"A":"x1","B":"y1"}, ... "key2":{"A":"x2","B":"y2"}, ... "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> test(p,"",False,True) ('key0', ['key0'], 1) ('key1', ['key1'], {'A': 'x1', 'B': 'y1'}) ('key1.A', ['key1', 'A'], 'x1') ('key1.B', ['key1', 'B'], 'y1') ('key2', ['key2'], {'A': 'x2', 'B': 'y2'}) ('key2.A', ['key2', 'A'], 'x2') ('key2.B', ['key2', 'B'], 'y2') ('key3', ['key3'], [1, 2, 3, {'float': 1.23}]) ('key3[0]', ['key3', 0], 1) ('key3[1]', ['key3', 1], 2) ('key3[2]', ['key3', 2], 3) ('key3[3]', ['key3', 3], {'float': 1.23}) ('key3[3].float', ['key3', 3, 'float'], 1.23) >>> test(p,"key2",False,True) ('key2', ['key2'], {'A': 'x2', 'B': 'y2'}) >>> test(p,"key2.**",False,True) ('key2.A', ['key2', 'A'], 'x2') ('key2.B', ['key2', 'B'], 'y2') >>> test(p,"key3",False,True) ('key3', ['key3'], [1, 2, 3, {'float': 1.23}]) >>> test(p,"key3.*",False,True) ('key3[0]', ['key3', 0], 1) ('key3[1]', ['key3', 1], 2) ('key3[2]', ['key3', 2], 3) ('key3[3]', ['key3', 3], {'float': 1.23}) >>> test(p,"key3.**",False,True) ('key3[0]', ['key3', 0], 1) ('key3[1]', ['key3', 1], 2) ('key3[2]', ['key3', 2], 3) ('key3[3]', ['key3', 3], {'float': 1.23}) ('key3[3].float', ['key3', 3, 'float'], 1.23) >>> test(p,"*.A",False,True) ('key1.A', ['key1', 'A'], 'x1') ('key2.A', ['key2', 'A'], 'x2') >>> test(p,"*[0]",False,True) ('key3[0]', ['key3', 0], 1) >>> test(p,"*[3].**",False,True) ('key3[3].float', ['key3', 3, 'float'], 1.23) >>> d={"key1":{"A":"x1","B":"y1"}} >>> d["key1"]["B"]= d["key1"] >>> p= StructuredDataStore(d) >>> test(p,"",False,True) ('key1', ['key1'], {'A': 'x1', 'B': {...}}) ('key1.A', ['key1', 'A'], 'x1') ('key1.B', ['key1', 'B'], {'A': 'x1', 'B': {...}}) >>> test(p,"",False,True,True) ('key1*', [('key1', '*')], {'A': 'x1', 'B': {...}}) ('key1*.A', [('key1', '*'), 'A'], 'x1') ('key1*.B*', [('key1', '*'), ('B', '*')], {'A': 'x1', 'B': {...}}) """ # pylint: disable= too-many-locals, too-many-branches # pylint: disable= too-many-statements def pattern_iterator(val, patternkey, sorted_, show_links): """iterator with a given pattern. val MUST be a collection ! """ def k_chg(k,v): """key change ?""" if not show_links: return (k,v) s= self._get_links_by_id(id(v)) if s is None: return (k,v) return ((k,"*"),v) # --------- # pylint: disable= no-else-return if isinstance(patternkey, SpecialKey): if patternkey.simple_wildcard(): for (k,v) in dictutils.key_val_iterator(val, sorted_=sorted_): if patternkey.match(k): yield k_chg(k,v) return elif patternkey.recursive_wildcard(): for (k,v) in dictutils.key_val_iterator(val, sorted_=sorted_): yield k_chg(k,v) return else: raise ValueError("unexpected specialkey: %s" % repr(patternkey)) else: try: v= val[patternkey] except KeyError: return except TypeError: return yield k_chg(patternkey, v) return # ----------------- depth= 0 pattern_list= StructuredDataStore.split_path(pattern) if not pattern_list: pattern_list=[ANYKEYS] pattern_len= len(pattern_list) pattern_key= pattern_list[depth] paths= [""] path= "" keys= [None] value= self._data id_set= set((id(self._data),)) id_list= [id(self._data)] iterators= [pattern_iterator(value, pattern_key, sorted_, show_links)] while True: try: #print("keys:",repr(keys)) #print("iterators:",iterators) (keys[-1],value)= next(iterators[-1]) # print("KEY",repr(keys[-1])) except StopIteration: iterators.pop() keys.pop() paths.pop() id_set.discard( id_list.pop() ) depth-=1 if depth<0: return if depth+1>=pattern_len: pattern_key= pattern_list[-1] # can only happen with ANYKEYS else: pattern_key= pattern_list[depth+1] continue is_collection= is_iterable(value) path= StructuredDataStore.append_path(paths[-1], keys[-1]) if (not only_leaves) or (not is_collection): if (depth == pattern_len-1) or \ SpecialKey.is_recursive_wildcard(pattern_key): if paths_found_set is None: yield (path,keys,value) else: if path not in paths_found_set: yield (path,keys,value) paths_found_set.add(path) if is_collection: if id(value) in id_set: continue if not SpecialKey.is_recursive_wildcard(pattern_key): if depth+1>=pattern_len: continue pattern_key= pattern_list[depth+1] id_list.append(id(value)) id_set.add(id(value)) keys.append(None) paths.append(path) iterators.append(pattern_iterator( value, pattern_key, sorted_, show_links)) depth+=1
[docs] def universal_iter(self, patterns, path_list, only_leaves, sorted_, show_links): """universal iteration through the StructuredDataStore with path_list. Here are some examples: >>> d={"key1":1, "key2": {"A":"x","B":"y"}, "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> def test(s,patterns,path_list,only_leaves,sorted_, ... show_links=False): ... for l in s.universal_iter(patterns,path_list,only_leaves, ... sorted_,show_links): ... print(repr(l)) ... >>> test(p,"",['key3[0]','key3[3]'],False,True) ('key3[0]', ['key3', 0], 1) ('key3[3]', ['key3', 3], {'float': 1.23}) >>> test(p,"",None,False,True) ('key1', ['key1'], 1) ('key2', ['key2'], {'A': 'x', 'B': 'y'}) ('key2.A', ['key2', 'A'], 'x') ('key2.B', ['key2', 'B'], 'y') ('key3', ['key3'], [1, 2, 3, {'float': 1.23}]) ('key3[0]', ['key3', 0], 1) ('key3[1]', ['key3', 1], 2) ('key3[2]', ['key3', 2], 3) ('key3[3]', ['key3', 3], {'float': 1.23}) ('key3[3].float', ['key3', 3, 'float'], 1.23) >>> test(p,"key3.**",['key3','key3[0]','key3[3]'],False,True) ('key3[0]', ['key3', 0], 1) ('key3[3]', ['key3', 3], {'float': 1.23}) >>> test(p,"key3.**",[],False,True) ('key3[0]', ['key3', 0], 1) ('key3[1]', ['key3', 1], 2) ('key3[2]', ['key3', 2], 3) ('key3[3]', ['key3', 3], {'float': 1.23}) ('key3[3].float', ['key3', 3, 'float'], 1.23) >>> test(p,["key2.**","key3.**"],[],False,True) ('key2.A', ['key2', 'A'], 'x') ('key2.B', ['key2', 'B'], 'y') ('key3[0]', ['key3', 0], 1) ('key3[1]', ['key3', 1], 2) ('key3[2]', ['key3', 2], 3) ('key3[3]', ['key3', 3], {'float': 1.23}) ('key3[3].float', ['key3', 3, 'float'], 1.23) """ if not is_iterable(patterns): if not patterns: patterns= [""] else: patterns= [patterns] paths_found_set= None else: paths_found_set= set() for pattern in patterns: if not path_list: iter_= self.direct_iter(pattern= pattern, only_leaves= only_leaves, sorted_= sorted_, show_links= show_links, paths_found_set= paths_found_set) else: iter_= self.selection_iter(pattern= pattern, path_list= path_list, only_leaves= only_leaves, sorted_= sorted_, show_links= show_links, paths_found_set= paths_found_set) for(path,keylist,val) in iter_: yield(path,keylist,val)
# ----------------------------------------------------- # path pattern support # ----------------------------------------------------- # searching paths def _pattern_search(self, pattern, is_rx_pattern, path_list= None, add_values= False, only_leaves=True, show_links= False): """search all paths for a given pattern. """ if is_rx_pattern: rx= re.compile(pattern) match= lambda st: rx.match(st) is not None else: words= pattern.split() match= lambda st: andmatch(st, words) matched= [] for (path,_,val) in self.universal_iter(patterns="", path_list= path_list, only_leaves= only_leaves, sorted_= False, show_links= show_links): if not match(path): continue if add_values: matched.append((path, val)) else: matched.append(path) return matched # ----------------------------------------------------- # searching values def _pattern_value_search(self, pattern, is_rx_pattern, path_patterns= None, path_list= None, add_values= False, show_links= False): """search for a value with a pattern. """ if is_rx_pattern: rx= re.compile(pattern) match= lambda st: rx.match(st) is not None else: words= pattern.split() match= lambda st: andmatch(st, words) matched= [] for (path,_,val) in self.universal_iter(patterns= path_patterns, path_list= path_list, only_leaves= True, sorted_= False, show_links= show_links): if not match(str(val)): continue if add_values: matched.append((path, val)) else: matched.append(path) return matched # ----------------------------------------------------- # path utilities @staticmethod def _keyconvert(key, include_separator=False): if include_separator: separator= "." else: separator= "" if isinstance(key, tuple): extra= key[1] key= key[0] else: extra= "" if isinstance(key, SpecialKey): key= str(key) elif isinstance(key, str): key= escape_in_stringkey(key) elif isinstance(key, int): separator= "" key= "[%d]" % key else: key= str(key) return "".join((separator,key,extra))
[docs] @staticmethod def join_path(keylist): r"""joins a path from a keylist. This method joins a list of keys to a path. It doesn't need access to the StructuredDataStore so it is a static method. Usually each key of the keylist is a SpecialKey object, a string or an integer or a pair. In the latter case the pair consists of a SpecialKey, a string or an integer and an extra string. All these are combined to form a StructuredData path. Here are some examples: >>> print(StructuredDataStore.join_path(["A","B"])) A.B >>> print(StructuredDataStore.join_path(["A.B","C"])) A\.B.C >>> print(StructuredDataStore.join_path(["A",2,"C"])) A[2].C >>> print(StructuredDataStore.join_path(["A","*","C"])) A.\*.C >>> print(StructuredDataStore.join_path(["A","x[3]","C"])) A.x\[3\].C >>> print(StructuredDataStore.join_path(["A",ANYKEY,"B"])) A.*.B """ path= ".".join([StructuredDataStore._keyconvert(x) for x in keylist]) # avoid things like "A.[3]B": return path.replace(".[","[")
[docs] @staticmethod def append_path(path, key): r"""append a key to a path. The key is a path key or a pair of a path key and an extra string. Here are some examples: >>> print(StructuredDataStore.append_path("a.b","c")) a.b.c >>> print(StructuredDataStore.append_path("a.b",3)) a.b[3] >>> print(StructuredDataStore.append_path("a[1]","c")) a[1].c >>> print(StructuredDataStore.append_path("a[1]",2)) a[1][2] >>> print(StructuredDataStore.append_path("","c")) c >>> print(StructuredDataStore.append_path("",3)) [3] >>> print(StructuredDataStore.append_path("a[1]","x.y")) a[1].x\.y >>> print(StructuredDataStore.append_path("a[1]","*")) a[1].\* >>> print(StructuredDataStore.append_path("a[1]","**")) a[1].\** """ return path + StructuredDataStore._keyconvert(key, bool(path))
[docs] def annotated_join_path(self,keylist): """similar to join_path, but give hints on links. All keys in the path that relate to a linked object, one that is referred by other paths as well, get a "*" appended at the end. Note that list indices get the star appended to the closing bracket. Here are some examples: >>> d= {"A": {"B": 1, "C": None}, ... "x": {"y":None}, ... "l": [["val"], 1, None] ... } >>> d["x"]["y"]= d["A"] >>> d["l"][2]= d["l"][0] >>> d["A"]["C"]= d["l"][0] >>> p= StructuredDataStore(d) >>> print(p) StructuredDataStore( { 'A': {'B': 1, 'C': ['val']}, 'l': [['val'], 1, ['val']], 'x': {'y': {'B': 1, 'C': ['val']}}} ) >>> p.annotated_join_path(["A","B"]) 'A*.B' >>> p.annotated_join_path(["A","B","C"]) Traceback (most recent call last): ... KeyError: 'at key C, item is a scalar' >>> p.annotated_join_path(["A","C"]) 'A*.C*' >>> p.annotated_join_path(["l",0]) 'l[0]*' >>> p.annotated_join_path(["l",1]) 'l[1]' >>> p.annotated_join_path(["l",2]) 'l[2]*' >>> p.annotated_join_path(["x"]) 'x' >>> p.annotated_join_path(["x","y"]) 'x.y*' """ if self._link_dict is None: self.refresh_links() current= self._data new_keylist= [] for k in keylist: if not hasattr(current, "__getitem__"): raise KeyError("at key %s, item is a scalar" % k) current= current[k] # works for dicts and lists s= self._get_links_by_id(id(current)) if s is not None: new_keylist.append((k,"*")) else: new_keylist.append(k) return StructuredDataStore.join_path(new_keylist)
[docs] @staticmethod def split_path(path): r"""splits a path into a keylist. This static method splits a given path to a list of keys. Note that in the returned list, strings are map keys and integers are list indices. If the path is a pattern, the returned keylist may contain SpecialKey objects like ANYREY or ANYKEYS. >>> print(StructuredDataStore.split_path("A.B")) ['A', 'B'] >>> print(StructuredDataStore.split_path("")) [] >>> print(StructuredDataStore.split_path(r"A\.B.C")) ['A.B', 'C'] >>> print(StructuredDataStore.split_path("A[2].C")) ['A', 2, 'C'] >>> StructuredDataStore.split_path("A[xy].C") ['A', '[xy]', 'C'] >>> StructuredDataStore.split_path("A[4][3].C") ['A', 4, 3, 'C'] >>> StructuredDataStore.split_path("A.*.C") ['A', ANYKEY, 'C'] >>> StructuredDataStore.split_path("A.**.C") ['A', ANYKEYS, 'C'] """ if not path: # empty string or None return [] path= StructuredDataStore._re_br.sub(r".\1", path) lst= StructuredDataStore._re_split.split(path) res= [] for elm in lst: m= StructuredDataStore._re_no.match(elm) if m is not None: elm= int(m.group(1)) else: elm= string_to_keyobject(elm) res.append(elm) return res
# ----------------------------------------------------- # printing/dumping # pylint: disable= line-too-long def __repr__(self): """return representation of object. This returns a python expression that can be used to recreate the object. Here is an example: >>> d={"key1":1, "key2": {"A":"x","B":"y"}, "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> print(repr(p)) StructuredDataStore({'key1': 1, 'key2': {'A': 'x', 'B': 'y'}, 'key3': [1, 2, 3, {'float': 1.23}]}) """ return "StructuredDataStore(%s)" % Repr.repr(self._data) # pylint: enable= line-too-long def __str__(self): """return a more human readable representation of the object. Here is an example: >>> d={"key1":1, "key2": {"A":"x","B":"y"}, "key3":[1,2,3,{"float":1.23}]} >>> p= StructuredDataStore(d) >>> print(str(p)) StructuredDataStore( { 'key1': 1, 'key2': {'A': 'x', 'B': 'y'}, 'key3': [1, 2, 3, {'float': 1.23}]} ) """ lines= ["StructuredDataStore("] l= pprint.pformat(self._data, indent=2, width=70).split("\n") if len(l)==1: if len(l[0])<60: return "StructuredDataStore(%s)" % l[0] l[-1]+="\n)" lines.extend(l) return "\n ".join(lines)
def _get_dict(dict_,key): """get dict_[key] or create this entry as a new dict.""" res= dict_.get(key) if res is not None: return res new= {} dict_[key]= new return new def _get_list(dict_,key): """get dict_[key] or create this entry as a new list.""" res= dict_.get(key) if res is not None: return res new= [] dict_[key]= new return new def _sorted_unique_insert(lst, val): """insert an element in a list and keep the list sorted. It is also ensured that no element appears in the list twice. Here are some examples: >>> l=[] >>> _sorted_unique_insert(l,4) >>> l [4] >>> _sorted_unique_insert(l,4) >>> l [4] >>> _sorted_unique_insert(l,6) >>> l [4, 6] >>> _sorted_unique_insert(l,6) >>> l [4, 6] >>> _sorted_unique_insert(l,4) >>> l [4, 6] >>> _sorted_unique_insert(l,5) >>> l [4, 5, 6] >>> _sorted_unique_insert(l,5) >>> l [4, 5, 6] >>> _sorted_unique_insert(l,4) >>> l [4, 5, 6] >>> _sorted_unique_insert(l,6) >>> l [4, 5, 6] >>> _sorted_unique_insert(l,2) >>> l [2, 4, 5, 6] >>> _sorted_unique_insert(l,3) >>> l [2, 3, 4, 5, 6] >>> _sorted_unique_insert(l,10) >>> l [2, 3, 4, 5, 6, 10] """ if not lst: lst.append(val) return l= len(lst) for i in range(l): if val<=lst[i]: if val==lst[i]: return lst.insert(i, val) return lst.append(val)
[docs]class MatchPaths(): """This class provides a store for paths and a match function. This class provides a store for paths that can be used to find matching paths for a given path. A path within the MatchPaths object may contain wildcards, "*" matches any map key and any list index. Each path in a MatchPaths object is associated with an arbitrary value. Here is the algorithm for the path matching: take all MatchPaths paths with the same length as path from the first element of the matchpath and the path to the last element: take all matchpaths where the current element is the same as the element in path if there is no matchpath that matches take all matchpaths that have a wildcard as current element continue the loop with the now smaller list of matchpaths The paths are internally stored in a different format in order to make the match function efficient. However, the MatchPaths object can be converted back to a normal dictionary. A path should be a StructuredDataStore - path, see also the documentation there. This object manages a list of paths where each path is mapped to an arbitrary object. """ @staticmethod def _convert_structure_dict(dict_): """convert a structure dict to a path dict. """ def treewalk(d, keylist, subdict): """walk the tree.""" for (k,v) in subdict.items(): if k==root_key_str: if not keylist: d[root_key_str]= v else: d[StructuredDataStore.join_path(keylist)]= v else: treewalk(d, keylist+[k], v) d= {} treewalk(d, [], dict_) return d
[docs] def clone(self, deepcopy= True): """clones a MatchPaths object. Here is an example: >>> d={"#": "top level", ... "a": "top level 'a'", ... "*.*": "level 2, '*.*'", ... "a.*": "level 2, 'a.*'", ... } >>> m=MatchPaths(d) >>> print(str(m)) MatchPaths( { '#': 'top level', '*.*': "level 2, '*.*'", 'a': "top level 'a'", 'a.*': "level 2, 'a.*'"} ) >>> shallow= m.clone(False) >>> deep= m.clone(True) >>> m["*.*"]= "NEW!!" >>> print(str(m)) MatchPaths( { '#': 'top level', '*.*': 'NEW!!', 'a': "top level 'a'", 'a.*': "level 2, 'a.*'"} ) >>> print(str(shallow)) MatchPaths( { '#': 'top level', '*.*': 'NEW!!', 'a': "top level 'a'", 'a.*': "level 2, 'a.*'"} ) >>> print(str(deep)) MatchPaths( { '#': 'top level', '*.*': "level 2, '*.*'", 'a': "top level 'a'", 'a.*': "level 2, 'a.*'"} ) """ new= self.__class__() # pylint: disable= protected-access if not deepcopy: new._data= copy.copy(self._data) else: new._data= copy.deepcopy(self._data) return new
def __init__(self, spec=None, check_func= None): """initialize the object from spec. This initializes the object. A python dictionary can be given as an optional parameter in order to initialize the object. """ self._data= {} self._locked= False if spec is not None: MatchPaths.add(self, spec, check_func)
[docs] def lock(self): """make the MatchPaths object read-only.""" self._locked= True
[docs] def m_direct_iter(self, pattern= "", length= None, paths_found_set= None): r"""walk through all types. returns lists of strings. SpecialKey objects are returned as objects. IMPORTANT NOTE: The keylist returned at each iteration is NOT a new list but the same list with a new content at each iteration. If you use this iterator to store the keylist somewhere keep in mind that you have to make a shallow copy of the keylist like in "my_keylist= keylist[:]". Note that list comprehension [x for x in s.m_direct_iter] WILL NOT work due to this reason. Here are some examples: >>> d={"#": "top level", ... "*": "top level, '*'", ... "a": "top level 'a'", ... "*.*": "level 2, '*.*'", ... "a.*": "level 2, 'a.*'", ... "n.*": "level 2, 'n.*", ... "n[2]": "level 2, 'n[2]", ... "a.b.C": "level 3, a.b.C", ... "a.*.c": "level 3, a.*.c", ... "a.*.*": {"mykey": 2 } ... } >>> m=MatchPaths(d) >>> def p(m,pattern,length,pathset=None): ... res= [(p,kl[:],v) for (p,kl,v) in m.m_direct_iter( ... pattern, ... length= length, ... paths_found_set= pathset)] ... res.sort(key= lambda x: x[0]) ... print(Repr.repr(res).replace("),","),\n")) >>> p(m,"",None) [('#', [ROOTKEY], 'top level'), ('*', [ANYKEY], "top level, '*'"), ('*.*', [ANYKEY, ANYKEY], "level 2, '*.*'"), ('a', ['a'], "top level 'a'"), ('a.*', ['a', ANYKEY], "level 2, 'a.*'"), ('a.*.*', ['a', ANYKEY, ANYKEY], {'mykey': 2}), ('a.*.c', ['a', ANYKEY, 'c'], 'level 3, a.*.c'), ('a.b.C', ['a', 'b', 'C'], 'level 3, a.b.C'), ('n.*', ['n', ANYKEY], "level 2, 'n.*"), ('n[2]', ['n', 2], "level 2, 'n[2]")] >>> p(m,"",2) [('*.*', [ANYKEY, ANYKEY], "level 2, '*.*'"), ('a.*', ['a', ANYKEY], "level 2, 'a.*'"), ('n.*', ['n', ANYKEY], "level 2, 'n.*"), ('n[2]', ['n', 2], "level 2, 'n[2]")] >>> p(m,"",5) [] >>> p(m,"",3) [('a.*.*', ['a', ANYKEY, ANYKEY], {'mykey': 2}), ('a.*.c', ['a', ANYKEY, 'c'], 'level 3, a.*.c'), ('a.b.C', ['a', 'b', 'C'], 'level 3, a.b.C')] >>> p(m,"a.*",None) [('a.*', ['a', ANYKEY], "level 2, 'a.*'")] >>> p(m,"a.**",None) [('a.*', ['a', ANYKEY], "level 2, 'a.*'"), ('a.*.*', ['a', ANYKEY, ANYKEY], {'mykey': 2}), ('a.*.c', ['a', ANYKEY, 'c'], 'level 3, a.*.c'), ('a.b.C', ['a', 'b', 'C'], 'level 3, a.b.C')] >>> p(m,"a.**",2) [('a.*', ['a', ANYKEY], "level 2, 'a.*'")] >>> p(m,"a.**",3) [('a.*.*', ['a', ANYKEY, ANYKEY], {'mykey': 2}), ('a.*.c', ['a', ANYKEY, 'c'], 'level 3, a.*.c'), ('a.b.C', ['a', 'b', 'C'], 'level 3, a.b.C')] >>> p(m,"**",1) [('*', [ANYKEY], "top level, '*'"), ('a', ['a'], "top level 'a'")] >>> p(m,"**",0) [] >>> p(m,"",0) [('#', [ROOTKEY], 'top level')] >>> pset= set() >>> p(m,"a.b.*",None,pset) [('a.b.C', ['a', 'b', 'C'], 'level 3, a.b.C')] >>> p(m,"a.**",None,pset) [('a.*', ['a', ANYKEY], "level 2, 'a.*'"), ('a.*.*', ['a', ANYKEY, ANYKEY], {'mykey': 2}), ('a.*.c', ['a', ANYKEY, 'c'], 'level 3, a.*.c')] """ # pylint: disable= too-many-locals, too-many-branches, too-many-statements def pattern_iterator(val, patternkey): """iterator with a given patternkey.""" # pylint: disable= no-else-return if isinstance(patternkey, SpecialKey): if patternkey.simple_wildcard(): for (k,v) in val.items(): if patternkey.match(k): yield (k,v) return elif patternkey.recursive_wildcard(): for (k,v) in val.items(): yield (k,v) return else: raise ValueError("unexpected specialkey: %s" % repr(patternkey)) else: try: v= val[patternkey] except KeyError: return except TypeError: return yield (patternkey, v) return # --------- if (not length) and (length!=0): lengths= list(self._data.keys()) else: if length not in self._data: return lengths= [length] if pattern: pattern_list= StructuredDataStore.split_path(pattern) if not pattern_list: pattern_list=[] pattern_anykeys= True else: pattern_anykeys= (pattern_list[-1]==ANYKEYS) pattern_len= len(pattern_list) nl= [] for l in lengths: # pylint: disable= no-else-continue if l<pattern_len: continue elif (l==pattern_len) or pattern_anykeys: nl.append(l) lengths= nl else: pattern_list=[ANYKEYS] pattern_len= len(pattern_list) if not lengths: return #print("LEN:",repr(lengths)) for l in lengths: d= self._data.get(l) if d is None: raise AssertionError("d is None!") if l==0: # if l==0 is in lengths, there cannot be # a pattern given, so output the ROOTKEY anyway: if paths_found_set is None: yield (str(ROOTKEY), [ROOTKEY], d) else: if str(ROOTKEY) not in paths_found_set: paths_found_set.add(str(ROOTKEY)) continue depth= 0 paths=[""] keylist= [None] pattern_key= pattern_list[depth] iterators= [pattern_iterator(d, pattern_key)] #print("len",length," iterators:",repr(iterators)) while iterators: try: (k,v)= next(iterators[-1]) if k is None: continue # ignore "None" key keylist[-1]= k path= StructuredDataStore.append_path(paths[-1], keylist[-1]) #print("(k,v):",repr(keylist),repr(v)) except StopIteration: iterators.pop() depth-=1 if depth>=pattern_len: pattern_key= pattern_list[-1] # can only happen with ANYKEYS else: pattern_key= pattern_list[depth] keylist.pop() paths.pop() continue #print("d, (k,v):",depth,repr(k),repr(v)) if depth+1==l: if paths_found_set is None: yield (path, keylist, v) else: if path not in paths_found_set: yield (path, keylist, v) paths_found_set.add(path) elif depth+1<l: depth+=1 if depth>=pattern_len: pattern_key= pattern_list[-1] # can only happen with ANYKEYS else: pattern_key= pattern_list[depth] #print("pattern_key:",repr(pattern_key)) iterators.append(pattern_iterator(v, pattern_key)) keylist.append(None) paths.append(path) return # implies StopIteration
[docs] def add(self, spec, check_func= None): """add paths, spec must be a dict or a list of pairs. This method adds paths and values from a given dictionary or a given list of pairs. A check function may be provided by the optional parameter check_func. This function is called on each value found in the given dictionary or list (note: only for the value, not for the path). The user may choose to raise an exception in the check function, if a value doesn't match certain criteria. Here are some examples: >>> m=MatchPaths({"A":"a"}) >>> m.add({"B.C":"x","D.E":"y"}) >>> print(m) MatchPaths({'A': 'a', 'B.C': 'x', 'D.E': 'y'}) >>> m=MatchPaths({"A":"a"}) >>> m.add((("B.C","x"),("D.E","y"))) >>> print(m) MatchPaths({'A': 'a', 'B.C': 'x', 'D.E': 'y'}) >>> m=MatchPaths({"A":"a"}) >>> m.add([("B.C","x"),("D.E","y")]) >>> print(m) MatchPaths({'A': 'a', 'B.C': 'x', 'D.E': 'y'}) >>> m.add({"*": "top wildcard", "A.*" : "a-wildcard", "B.C.*": "BC wildcard"}) >>> print(m) MatchPaths( { '*': 'top wildcard', 'A': 'a', 'A.*': 'a-wildcard', 'B.C': 'x', 'B.C.*': 'BC wildcard', 'D.E': 'y'} ) """ if self._locked: raise ValueError(("modification of %s failed, "+\ "object is locked") % self.__class__.__name__) if hasattr(spec, "items"): it= list(spec.items()) elif is_iterable(spec): it= spec.__iter__() else: raise TypeError("spec must be a dict or a list or tuple of pairs.") MatchPaths.iterator_add(self, it, check_func)
[docs] def iterator_add(self, iterator, check_func= None): """add typespecs from an iterator to the MatchPaths. This method adds paths from a iterator. The iterator must provide pairs consisting of a path and a value. A check function may be provided by the optional parameter check_func. This function is called on each value found in the given dictionary or list (note: only for the value, not for the path). The user may choose to raise an exception in the check function, if a value doesn't match certain criteria. No testcode here since this is implicitly tested by the testcode of method "add". """ if self._locked: raise ValueError(("modification of %s failed, "+\ "object is locked") % self.__class__.__name__) d= None for (k,v) in iterator: if check_func is not None: check_func(v) if k==root_key_str: # top node type specification self._data[0]= v continue # split the path into single keys: keylist= StructuredDataStore.split_path(k) # get the dict with Paths of this length: d= _get_dict(self._data, len(keylist)) last= len(keylist)-1 for i in range(last+1): key= keylist[i] if isinstance(key, SpecialKey): if not SpecialKey.simple_wildcard(key): raise ValueError(("key '%s' not allowed in type" +\ "path") % str(key)) l= _get_list(d, None) # create a new list, add to the # <None> key. This list defines # the sort order. _sorted_unique_insert(l, key) # keep this list of # Wildcards sorted if i<last: d= _get_dict(d, key) else: d[key]= v
def __getitem__(self, path): """return the value of a given path. Here are some examples: >>> d={"#": "top level", ... "*": "top level, '*'", ... "a": "top level 'a'", ... "*.*": "level 2, '*.*'", ... "a.*": "level 2, 'a.*'", ... "n.*": "level 2, 'n.*", ... "n[2]": "level 2, 'n[2]", ... "a.b.C": "level 3, a.b.C", ... "a.*.c": "level 3, a.*.c", ... "a.*.*": "level 3, a.*.*" ... } >>> m=MatchPaths(d) >>> m["#"] 'top level' >>> m["*"] "top level, '*'" >>> m["a"] "top level 'a'" >>> m[""] >>> m["*.*"] "level 2, '*.*'" >>> m["a.*"] "level 2, 'a.*'" >>> m["n.*"] "level 2, 'n.*" >>> m["n[2]"] "level 2, 'n[2]" >>> m["a.b.C"] 'level 3, a.b.C' >>> m["a..C"] >>> m["a.*.C"] >>> m["a.*.*"] 'level 3, a.*.*' >>> m["a.vs.dsd.dsdf"] """ if path==root_key_str: return self._data.get(0) if path=="": return None keylist= StructuredDataStore.split_path(path) d= self._data.get(len(keylist)) for k in keylist: if not isinstance(d, dict): return None d= d.get(k) if d is None: return None return d def __delitem__(self, path): """remove a single path. >>> d={"#": 1, ... "a": 2, ... "a.b": 3, ... } >>> m=MatchPaths(d) >>> Repr.printrepr(m._data) {0: 1, 1: {'a': 2}, 2: {'a': {'b': 3}}} >>> del m["#"] >>> Repr.printrepr(m._data) {1: {'a': 2}, 2: {'a': {'b': 3}}} >>> m=MatchPaths(d) >>> del m["a"] >>> Repr.printrepr(m._data) {0: 1, 2: {'a': {'b': 3}}} >>> m=MatchPaths(d) >>> del m["a.b"] >>> Repr.printrepr(m._data) {0: 1, 1: {'a': 2}} >>> d={"#": 1, ... "a": 2, ... "b": 3, ... "a.b": 4, ... "a.c": 5, ... } >>> m=MatchPaths(d) >>> del m["a"] >>> Repr.printrepr(m._data) {0: 1, 1: {'b': 3}, 2: {'a': {'b': 4, 'c': 5}}} >>> del m["b"] >>> Repr.printrepr(m._data) {0: 1, 2: {'a': {'b': 4, 'c': 5}}} >>> m=MatchPaths(d) >>> del m["a.b"] >>> Repr.printrepr(m._data) {0: 1, 1: {'a': 2, 'b': 3}, 2: {'a': {'c': 5}}} >>> del m["a.c"] >>> Repr.printrepr(m._data) {0: 1, 1: {'a': 2, 'b': 3}} >>> d={"#": 1, ... "a.b.c.d": 2, ... "a.b.c.e": 3 ... } >>> m=MatchPaths(d) >>> del m["a.b.c.e"] >>> Repr.printrepr(m._data) {0: 1, 4: {'a': {'b': {'c': {'d': 2}}}}} >>> del m["a.b.c.d"] >>> Repr.printrepr(m._data) {0: 1} >>> d={"#": 1, ... "a.b.*.d": 2, ... "a.b.*.e": 3 ... } >>> m=MatchPaths(d) >>> Repr.printrepr(m._data) {0: 1, 4: {'a': {'b': {None: [ANYKEY], ANYKEY: {'d': 2, 'e': 3}}}}} >>> del m["a.b.*.d"] >>> Repr.printrepr(m._data) {0: 1, 4: {'a': {'b': {None: [ANYKEY], ANYKEY: {'e': 3}}}}} >>> del m["a.b.*.e"] >>> Repr.printrepr(m._data) {0: 1} """ if self._locked: raise ValueError(("modification of %s failed, "+\ "object is locked") % self.__class__.__name__) if path==root_key_str: if 0 not in self._data: raise KeyError("path %s not found" % path) del self._data[0] return keylist= StructuredDataStore.split_path(path) last_key= len(keylist)-1 def _del(dict_,index): """recursive delete. returns False when the recursive delete should stop. """ key= keylist[index] val= dict_[key] if index<last_key: if not _del(val, index+1): return False if isinstance(key, SpecialKey): lst= dict_[None] lst.remove(key) if not lst: del dict_[None] del dict_[key] return len(dict_)<=0 l= len(keylist) _del(self._data[l], 0) if not self._data[l]: del self._data[l] def __setitem__(self, key, val): """add a single new path. This method adds a single path and value to the MatchPaths object. It is implicitly used in a statement like this "matchpathobject[path]= value". """ self.add( ((key,val),) )
[docs] def match(self, path, return_matchpath= False): """match a path in the MatchPaths. This method returns the value of the path in the MatchPaths object that matches the given path best. Here are some examples: >>> d={"#": "top level", ... "*": "top level, '*'", ... "a": "top level 'a'", ... "*.*": "level 2, '*.*'", ... "a.*": "level 2, 'a.*'", ... "n.*": "level 2, 'n.*", ... "n[2]": "level 2, 'n[2]", ... "a.b.C": "level 3, a.b.C", ... "a.*.c": "level 3, a.*.c", ... "a.*.*": "level 3, a.*.*" ... } >>> m=MatchPaths(d) >>> m.match("#") 'top level' >>> m.match("abc") "top level, '*'" >>> m.match("a") "top level 'a'" >>> m.match("x.y") "level 2, '*.*'" >>> m.match("a.y") "level 2, 'a.*'" >>> m.match("a.y.c") 'level 3, a.*.c' >>> m.match("a.b.c") 'level 3, a.*.c' >>> m.match("a.b.C") 'level 3, a.b.C' >>> m.match("a.x.y") 'level 3, a.*.*' >>> m.match("n[2]") "level 2, 'n[2]" >>> m.match("n[0]") "level 2, 'n.*" >>> m.match("x.y.z") >>> m=MatchPaths(d) >>> m.match("#",True) ('#', 'top level') >>> m.match("abc",True) ('*', "top level, '*'") >>> m.match("x.y",True) ('*.*', "level 2, '*.*'") >>> m.match("a.b.c",True) ('a.*.c', 'level 3, a.*.c') >>> m.match("a.b.C",True) ('a.b.C', 'level 3, a.b.C') >>> m.match("n[2]",True) ('n[2]', "level 2, 'n[2]") >>> m.match("n[0]",True) ('n.*', "level 2, 'n.*") """ def walk(last, matchkeys, keylist, index, dct): """walk the tree.""" val= dct.get(keylist[index]) if val is not None: if index==last: return val # here val is another dictionary: val= walk(last, matchkeys, keylist, index+1, val) if val is not None: # the recursively called walk has found something: return val # the recursively called walk has found nothing from here wildcards= dct.get(None) if wildcards is None: # no wildcards here return None for w in wildcards: if w.match(keylist[index]): matchkeys[index]= w val= dct[w] if index==last: return val # here val is another dictionary: val= walk(last, matchkeys, keylist, index+1, val) if val is not None: # the recursively called walk has found something: return val matchkeys[index]= keylist[index] # the recursively called walk has found nothing from here return None # def. of function "walk" ends here # --------------------------------- if path==root_key_str: # pylint: disable= no-else-return if not return_matchpath: return self._data.get(0) else: return (root_key_str, self._data.get(0)) keylist= StructuredDataStore.split_path(path) # keylist may contain strings and integers # now get the dictionary with paths keylists of this length: last= len(keylist)-1 d= self._data.get(len(keylist)) if d is None: return None matchkeys= keylist[:] # pylint: disable= no-else-return if not return_matchpath: return walk(last, matchkeys, keylist, 0, d) else: res= walk(last, matchkeys, keylist, 0, d) return (StructuredDataStore.join_path(matchkeys), res)
[docs] def as_path_dict(self): """convert the MatchPaths object to a dictionary. This method returns a dictionary than contains all the data of the MatchPaths object. Here is an example: >>> d={"#": "top level", ... "*": "top level, '*'", ... "a": "top level 'a'", ... "*.*": "level 2, '*.*'", ... "a.*": "level 2, 'a.*'", ... "a.*.b": "level 3, a.*.b", ... "a.*.*": "level 3, a.*.*" ... } >>> m=MatchPaths(d) >>> Repr.printnice(m.as_path_dict(), 0) { '#': 'top level', '*': "top level, '*'", '*.*': "level 2, '*.*'", 'a': "top level 'a'", 'a.*': "level 2, 'a.*'", 'a.*.*': 'level 3, a.*.*', 'a.*.b': 'level 3, a.*.b' } """ return { p: v for (p,_,v) in self.m_direct_iter() }
def __repr__(self): """returns a python expression of the object. Here is an example: >>> d={"#":"a","*":"b","a.*.c":"c"} >>> m=MatchPaths(d) >>> repr(m) "MatchPaths({'#': 'a', '*': 'b', 'a.*.c': 'c'})" """ spec= self.as_path_dict() return "%s(%s)" % (self.__class__.__name__,Repr.repr(spec)) def __str__(self): """print(the object as a human readable string.) Here is an example: >>> d={"#": "top level", ... "*": "top level, '*'", ... "*.*": "level 2, '*.*'", ... "a.*": "level 2, 'a.*'", ... "a.*.b": "level 3, a.*.b", ... "a.*.*": "level 3, a.*.*" ... } >>> m=MatchPaths(d) >>> print(str(m)) MatchPaths( { '#': 'top level', '*': "top level, '*'", '*.*': "level 2, '*.*'", 'a.*': "level 2, 'a.*'", 'a.*.*': 'level 3, a.*.*', 'a.*.b': 'level 3, a.*.b'} ) """ spec= self.as_path_dict() lines= ["%s(" % self.__class__.__name__] l= pprint.pformat(spec, indent=2, width=70).split("\n") if len(l)==1: return "%s(%s)" % (self.__class__.__name__,l[0]) l[-1]+="\n)" lines.extend(l) return "\n ".join(lines)
[docs]class StructuredDataTypes(MatchPaths): """a collection of type specifications for a number of paths. This class provides a store for paths and associated type specifications. It is a derivative of MatchPaths. For each path a typespec is specified. A typespec must be understandable by check_type_scalar. """ @staticmethod def _check_func(spec): spec.check(None, dry_run= True) @staticmethod def _iterator_convert_spec(spec_iterator): """convert SD specifications to SingleTypeSpec.""" for (k,v) in spec_iterator: yield (k, SingleTypeSpec(v)) @staticmethod def _convert_spec(path_type_decl): """convert SD specifications to SingleTypeSpec.""" if path_type_decl is None: return None return dict( StructuredDataTypes._iterator_convert_spec(\ list(path_type_decl.items())) ) def __init__(self, path_type_decl=None): """initialize the object from path_type_decl. This initializes the object. A python dictionary can be given as an optional parameter in order to initialize the object. The dictionary should map paths to type specifications. Here is an example: >>> t=StructuredDataTypes({"":"integer","*":"map","a.*":"list"}) >>> t=StructuredDataTypes({"":"integer","*":"map","a.*":"listx"}) Traceback (most recent call last): ... ValueError: unknown typespec: "listx" """ MatchPaths.__init__(self, StructuredDataTypes._convert_spec(path_type_decl), StructuredDataTypes._check_func)
[docs] def add(self, spec, check_func= None): """add paths and type specifications. This method adds paths and type specifications from a given dictionary or a given list of pairs. Here are some examples: >>> t=StructuredDataTypes({"":"integer","*":"map","a.*":"list"}) >>> t.add({"b":"integer","c.d":"real"}) >>> t.add({"x":"blah"}) Traceback (most recent call last): ... ValueError: unknown typespec: "blah" """ if check_func is None: check_func= StructuredDataTypes._check_func MatchPaths.add(self, StructuredDataTypes._convert_spec(spec), check_func)
[docs] def iterator_add(self, iterator, check_func= None): """add paths and type specifications from an iterator. This method adds paths and type specifications from a iterator. The iterator must provide pairs of a path and a type specification. Here are some examples: >>> t=StructuredDataTypes({"":"integer","*":"map","a.*":"list"}) >>> t.iterator_add({"b":"integer","c.d":"real"}.items()) >>> t.iterator_add({"x":"blah"}.items()) Traceback (most recent call last): ... ValueError: unknown typespec: "blah" """ if check_func is None: check_func= StructuredDataTypes._check_func MatchPaths.iterator_add(self,\ StructuredDataTypes._iterator_convert_spec(iterator),\ check_func)
def __getitem__(self,path): """get a single item of the StructuredDataTypes. This method returns a type declaration for a given path. By this you can used the StructuredDataTypes object like this: "structureddatatypes[path]". Here are some examples: >>> t=StructuredDataTypes({"#":"integer","*":"map","a.*":"list"}) >>> t["#"] SingleTypeSpec('integer') >>> t["*"] SingleTypeSpec('map') >>> t["a.*"] SingleTypeSpec('list') >>> print(t["a.b"]) None """ return MatchPaths.__getitem__(self,path)
[docs] def as_path_dict(self): """return the object as a dict. Here is an example: >>> t=StructuredDataTypes({"#":"integer","*":"map","a.*":"list"}) >>> Repr.printrepr(t.as_path_dict()) {'#': 'integer', '*': 'map', 'a.*': 'list'} """ d= MatchPaths.as_path_dict(self) for (k,v) in d.items(): d[k]= v.to_dict() # v is a SingleTypeSpec object return d
[docs] def check(self, path, value): """typecheck a value for a given path. This method takes a path and a value. It then searches the object for a matching path and checks the value against the type specification associated with the found path. If the typecheck fails, it raises an exception. Here are some examples: >>> d={"#": {"optional_struct":["a","b"]}, ... "a": {"typed_map":"integer"}, ... "b": "map", ... "b.*": {"typed_list":"string"}, ... "n[0]": "string", ... "n.*": "integer", ... } >>> t= StructuredDataTypes(d) >>> t.check("#", {"a":1}) >>> t.check("#", {"a":1}) >>> t.check("#", {"a":0,"b":1}) >>> t.check("#", {"a":0,"b":1,"c":2}) Traceback (most recent call last): ... TypeError: key c not in list of allowed keys >>> t.check("a", {1:"x",2:"y"}) >>> t.check("a", {1:"x",2:"y","z":"zz"}) Traceback (most recent call last): ... TypeError: integer expected, got: 'z' >>> t.check("a", 2) Traceback (most recent call last): ... TypeError: map expected >>> t.check("b", {"y":1, 2:"a"}) >>> t.check("b", [1,2]) Traceback (most recent call last): ... TypeError: map expected >>> t.check("b.x", ["a","b"]) >>> t.check("b.x", ["a","b",3]) Traceback (most recent call last): ... TypeError: string expected, got: 3 >>> t.check("b.x", 2) Traceback (most recent call last): ... TypeError: list expected >>> t.check("n[0]", "ab") >>> t.check("n[0]", 2) Traceback (most recent call last): ... TypeError: string expected, got: 2 >>> t.check("n[1]", 2) >>> t.check("n[1]", "a") Traceback (most recent call last): ... TypeError: integer expected, got: 'a' """ t= self.match(path) if t is None: return t.check(value)
[docs]class StructuredDataContainer(): """This is a simple container that can hold a StructuredDataStore, StructuredDataTypes or both. A StructuredDataContainer is a map that always has the key "**SDC-Metadata** and the optional keys "**SDC-Store**" and "**SDC-Types**". SDC-Metadata: This is a simple map that contains information on the format used. Currently it has only one key, "version" which has as value the current version number of the format as a string. As of the writing of this document, the current version is "1.0". SDC-Store: This is the StructuredDataStore map. SDC-Types: This is the StructuredDataTypes map. """ MetaDataTag="**SDC-Metadata**" DataTag ="**SDC-Store**" TypeTag ="**SDC-Types**" Version ="1.0" def _meta_check(self, initializer, change_meta= False): """initialize the meta-data part.""" SDC= StructuredDataContainer if not isinstance(initializer, dict): raise TypeError("initializer must be a dict") _meta= initializer.get(SDC.MetaDataTag) if _meta is None: raise ValueError("the key %s is missing in the dict" % \ SDC.MetaDataTag) version= _meta.get("version") if version is None: raise ValueError("the version number is missing in the metadata") if version!=SDC.Version: raise ValueError(("wrong version of data format, " + \ "found: %s expected: %s") %\ (_meta[SDC.Version],SDC.Version)) if change_meta: self._meta= _meta.copy() # shallow copy !!
[docs] def pickle(self, filename): """write StructuredDataContainer pickled.""" stream= open(filename, 'w') _pickle_write(stream, self.as_dict()) stream.close()
[docs] @staticmethod def unpickle(filename): """load from pickled data.""" stream= open(filename, 'r') dict_= pickle.load(stream) stream.close() return StructuredDataContainer(dict_)
[docs] @staticmethod def from_yaml_file(filename): """create a StructuredDataContainer from a yaml file. Does also lock the file during reading. """ if use_lockfile: lk= lock.MyLock(filename, timeout= FILE_LOCK_TIMEOUT) try: lk.lock() except (lock.LockedError, lock.AccessError, lock.NoSuchFileError, OSError) as e: raise AssertionError(("file locking of file %s failed with error %s, "+\ "you may try to manually remove the lockfile") % \ (filename, repr(e))) try: dict_= _read_cached(filename, myyaml.read_file) finally: if use_lockfile: lk.unlock() return StructuredDataContainer(dict_)
[docs] @staticmethod def from_yaml_string(st): """create a StructuredDataContainer from a yaml file. """ dict_= myyaml.read_string(st) return StructuredDataContainer(dict_)
[docs] def clone(self, deepcopy= True): """clones a StructuredDataContainer object. Here is an example: >>> d= {"**SDC-Metadata**": {"version": "1.0" }, ... "**SDC-Store**" : {"a": {"x":1, "y":2 }, "b":10 }, ... "**SDC-Types**" : {"#": {"typed_map": "string"} } ... } >>> t= StructuredDataContainer(d) >>> shallow= t.clone(False) >>> deep = t.clone(True) >>> t.store()["a.y"]= "NEW" >>> t.store().setitem("a.z", "ADDED", True) >>> print(t) StructuredDataContainer( {'**SDC-Metadata**': {'version': '1.0'}}, StructuredDataStore({'a': {'x': 1, 'y': 'NEW', 'z': 'ADDED'}, 'b': 10}), StructuredDataTypes({'#': {'typed_map': 'string'}}) ) >>> print(shallow) StructuredDataContainer( {'**SDC-Metadata**': {'version': '1.0'}}, StructuredDataStore({'a': {'x': 1, 'y': 'NEW', 'z': 'ADDED'}, 'b': 10}), StructuredDataTypes({'#': {'typed_map': 'string'}}) ) >>> print(deep) StructuredDataContainer( {'**SDC-Metadata**': {'version': '1.0'}}, StructuredDataStore({'a': {'x': 1, 'y': 2}, 'b': 10}), StructuredDataTypes({'#': {'typed_map': 'string'}}) ) """ new= StructuredDataContainer() # pylint: disable= protected-access if not deepcopy: new._meta= copy.copy(self._meta) else: new._meta= copy.deepcopy(self._meta) if self._data is not None: new._data= self._data.clone(deepcopy) if self._types is not None: new._types= self._types.clone(deepcopy) return new
def __init__(self, dict_= None): """initialize the PortableStructuredData object. This method initializes the object. The parameter initializer may be used to initialize the object from a dictionary. The dictionary must have at least the key "**SDC-Metadata**" which must refer to a dictionary with the key "version" which must refer to a version number string. Optionally the dictionary may have the key "**SDC-Store**" which must refer to a dictionary with the StructuredDataStore data. It also may have the key "**SDC-Types**" which must refer to a dictionary with StructuredDataTypes data in it. >>> d= {"**SDC-Metadata**": {"version": "1.0" }, ... "**SDC-Store**" : {"a":1, "b":2 }, ... "**SDC-Types**" : {"#": {"typed_map": "string"} } ... } >>> t= StructuredDataContainer(d) >>> d= {"**SDC-Metadata**": {"version": "1.0" }, ... "**SDC-Store**" : {"a":1, "b":2 }, ... } >>> t= StructuredDataContainer(d) >>> d= {"**SDC-Metadata**": {"version": "1.0" }, ... "**SDC-Types**" : {"#": {"typed_map": "string"} } ... } >>> t= StructuredDataContainer(d) >>> d= {"**SDC-Metadata**": {"version": "1.0" }, ... } >>> t= StructuredDataContainer(d) >>> d= {"**SDC-Store**" : {"a":1, "b":2 }, ... "**SDC-Types**" : {"#": {"typed_map": "string"} } ... } >>> t= StructuredDataContainer(d) Traceback (most recent call last): ... ValueError: the key **SDC-Metadata** is missing in the dict """ SDC= StructuredDataContainer if dict_ is None: self._meta= {"version" : SDC.Version } self._data= None self._types= None return self._meta_check(dict_, True) self._data= dict_.get(SDC.DataTag) if self._data is not None: self._data= StructuredDataStore(self._data) self._types= dict_.get(SDC.TypeTag) if self._types is not None: self._types= StructuredDataTypes(self._types)
[docs] def lock(self): """make the StructuredDataContainer read-only.""" self._data.lock() self._types.lock()
[docs] def add(self, dict_): """add data to the StructuredDataContainer from a dict. This method adds new data to the StructuredDataContainer object. The data provided to this method must be a dictionary with two optional keys, "**SDC-Store**" and "**SDC-Types**". The data referenced by "**SDC-Store**" is added to the internal StructuredDataStore object. The data referenced by "**SDC-Types**" is added to the internal StructuredDataTypes object. Here is an example: >>> t= StructuredDataContainer({"**SDC-Metadata**": {"version": "1.0" }, ... "**SDC-Store**": ... {"a": {"x": 1 } }, ... "**SDC-Types**": ... {"a.x" : "integer" } ... } ... ) >>> >>> print(t) StructuredDataContainer( {'**SDC-Metadata**': {'version': '1.0'}}, StructuredDataStore({'a': {'x': 1}}), StructuredDataTypes({'a.x': 'integer'}) ) >>> >>> t.add({"**SDC-Metadata**": {"version": "1.0" }, ... "**SDC-Store**": ... {"a": {"y": 2.0 } }, ... } ... ) >>> >>> print(t) StructuredDataContainer( {'**SDC-Metadata**': {'version': '1.0'}}, StructuredDataStore({'a': {'x': 1, 'y': 2.0}}), StructuredDataTypes({'a.x': 'integer'}) ) >>> >>> t.add({"**SDC-Metadata**": {"version": "1.0" }, ... "**SDC-Types**": ... {"a.y" : "real" } ... } ... ) >>> print(t) StructuredDataContainer( {'**SDC-Metadata**': {'version': '1.0'}}, StructuredDataStore({'a': {'x': 1, 'y': 2.0}}), StructuredDataTypes({'a.x': 'integer', 'a.y': 'real'}) ) """ SDC= StructuredDataContainer self._meta_check(dict_, True) _data= dict_.get(SDC.DataTag) if _data is not None: if self._data is None: self._data= StructuredDataStore(_data) else: self._data.add(_data) _types= dict_.get(SDC.TypeTag) if _types is not None: if self._types is None: self._types= StructuredDataTypes(_types) else: self._types.add(_types)
[docs] def add_yaml_string(self, st): """add data to the StructuredDataContainer from a yaml string. """ dict_= myyaml.read_string(st) self.add(dict_)
[docs] def add_yaml_file(self, filename): """add data to the StructuredDataContainer from a yaml file. """ dict_= myyaml.read_file(filename) self.add(dict_)
[docs] def as_dict(self): r"""return the object as a dictionary. This method returns a dictionary that contains all the information of the StructuredDataContainer object. Here is an example: >>> d= {"**SDC-Metadata**": {"version": "1.0" }, ... "**SDC-Store**" : {"a":1, "b":2 }, ... "**SDC-Types**" : {"#": {"typed_map": "string"} } ... } >>> t= StructuredDataContainer(d) >>> print(Repr.repr(t.as_dict()).replace("},","},\n")) {'**SDC-Metadata**': {'version': '1.0'}, '**SDC-Store**': {'a': 1, 'b': 2}, '**SDC-Types**': {'#': {'typed_map': 'string'}}} """ SDC= StructuredDataContainer d= { SDC.MetaDataTag: self._meta } if self._data is not None: d[SDC.DataTag]= self._data.as_dict() if self._types is not None: d[SDC.TypeTag]= self._types.as_path_dict() return d
# pylint: disable= line-too-long def __repr__(self): """return a python expression representing the object. Here is an example: >>> d= {"**SDC-Metadata**": {"version": "1.0" }, ... "**SDC-Store**" : {"a":1, "b":2 }, ... "**SDC-Types**" : {"#": {"typed_map": "string"} } ... } >>> t= StructuredDataContainer(d) >>> print(repr(t)) StructuredDataContainer({'**SDC-Metadata**': {'version': '1.0'}, '**SDC-Store**': {'a': 1, 'b': 2}, '**SDC-Types**': {'#': {'typed_map': 'string'}}}) """ return "StructuredDataContainer(%s)" % Repr.repr(self.as_dict()) # pylint: enable= line-too-long def __str__(self): """return a more human readable representation of the object. Here is an example: >>> d= {"**SDC-Metadata**": {"version": "1.0" }, ... "**SDC-Store**" : {"a":1, "b":2 }, ... "**SDC-Types**" : {"#": {"typed_map": "string"} } ... } >>> t= StructuredDataContainer(d) >>> print(t) StructuredDataContainer( {'**SDC-Metadata**': {'version': '1.0'}}, StructuredDataStore({'a': 1, 'b': 2}), StructuredDataTypes({'#': {'typed_map': 'string'}}) ) """ SDC= StructuredDataContainer lines= ["StructuredDataContainer("] d= { SDC.MetaDataTag: self._meta } l= pprint.pformat(d, indent=2, width=70).split("\n") l[-1]+= "," if self._data is not None: l.extend( str(self._data).splitlines() ) if self._types is not None: if self._data is not None: l[-1]+= "," l.extend( str(self._types).splitlines() ) lines.extend(l) lines.append(")") return "\n ".join(lines)
[docs] def as_yaml_string(self): """return the StructuredDataContainer as a yaml string. """ return myyaml.write_string(self.as_dict())
[docs] def as_yaml_file(self, filename): """return the StructuredDataContainer as a yaml file. Does also lock the file during writing. """ if use_lockfile: lk= lock.MyLock(filename, timeout= FILE_LOCK_TIMEOUT) try: lk.lock() except (lock.LockedError, lock.AccessError, lock.NoSuchFileError, OSError) as e: raise AssertionError(("file locking of file %s failed with error %s, "+\ "you may try to manually remove the lockfile") % \ (filename, e)) try: dict_= self.as_dict() myyaml.write_file(filename, dict_) _write_cache_file(filename, dict_) finally: if use_lockfile: lk.unlock()
[docs] def store(self): """return the StructuredDataStore contained in the object.""" return self._data
[docs] def clear_store(self): """remove the StructuredDataStore object.""" self._data= None
[docs] def set_store(self, data): """set the StructuredDataStore in the object.""" # allow setting the store to <None>: if data is not None: if not isinstance(data, StructuredDataStore): raise TypeError("data must be of class StructuredDataStore") self._data= data
[docs] def types(self): """return the StructuredDataTypes contained in the object.""" return self._types
[docs] def clear_types(self): """remove the StructuredDataTypes.""" self._types= None
[docs] def set_types(self, types_): """set the StructuredDataTypes contained in the object.""" # allow setting the types to <None>: if types_ is not None: if not isinstance(types_, StructuredDataTypes): raise TypeError("types must be of class StructuredDataTypes") self._types= types_
[docs] def meta(self): """return the metadata dictionary of the object.""" return self._meta
[docs] def typecheck(self, other=None): """perform a typecheck on the data. This method performs a typecheck on the contained StructuredDataStore. If there is also a StructuredDataTypes object in the container, this is used for the typechecks. Otherwise, the parameter "other" is used to retrieve type check information. This parameter may either be a StructuredDataTypes object or a StructuredDataContainer with embedded StructuredDataTypes. parameters: other This optional parameter is used to provide the type checking information. If this parameter is omitted, the function looks for type checking information embedded in the StructuredDataContainer. If it is not found there either, an AssertionError is raised. The parameter may be a StructuredDataTypes object or a StructuredDataContainer object with embeeded type checking information. Here are some examples: >>> d= {"**SDC-Metadata**": {"version": "1.0" }, ... "**SDC-Store**" : {"a":1, "b":2 }, ... "**SDC-Types**": {"#": {"typed_map": "string"} } ... } >>> t= StructuredDataContainer(d) >>> t.typecheck() >>> d= {"**SDC-Metadata**": {"version": "1.0" }, ... "**SDC-Store**" : { 2 :1, "b":2 }, ... "**SDC-Types**": {"#": {"typed_map": "string"} } ... } >>> t= StructuredDataContainer(d) >>> t.typecheck() Traceback (most recent call last): ... TypeError: error at path "#": string expected, got: 2 """ def do_check(types_, path, value): """do the check.""" try: types_.check(path, value) except TypeError as e: raise TypeError("error at path \"%s\": %s" % (path, str(e))) except AssertionError as e: raise AssertionError("error at path \"%s\": %s" % (path, str(e))) if self._data is None: return if other is None: if self._types is None: raise AssertionError("This StructuredDataContainer has to type definitions") types_= self._types elif isinstance(other, StructuredDataTypes): types_= other elif isinstance(other, StructuredDataContainer): types_= other.types() if not isinstance(types_, StructuredDataTypes): raise ValueError("other object contains to typechecks") else: raise TypeError("other object has wrong type %s" % type(other)) do_check(types_, root_key_str, self._data.as_dict()) for (path,_,value) in self._data.universal_iter(patterns="", path_list= None, only_leaves=False, sorted_= False, show_links= False): do_check(types_, path, value)
def _test(): # pylint: disable= import-outside-toplevel import doctest doctest.testmod() if __name__ == "__main__": _test()