Coverage for gws-app/gws/core/tree_impl.py: 88%
248 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
1"""Internal implementations of ``gws.Node`` and ``gws.Root`` methods."""
3from . import (
4 const as c,
5 util as u,
6 log,
7)
9Access = None
10Error = None
11ConfigurationError = None
12Data = None
13Props = None
14Object = None
17def object_repr(self):
18 r = getattr(self, 'extName', None) or class_name(self)
19 s = getattr(self, 'title', None)
20 if s:
21 r += f' title={s!r}'
22 s = getattr(self, 'uid', None)
23 if s:
24 r += f' uid={s}'
25 return '<' + r + ' ' + hex(id(self)) + '>'
28def node_initialize(self, config):
29 self.config = config
30 self.permissions = configure_permissions(self)
31 super_invoke(self, 'pre_configure')
32 super_invoke(self, 'configure')
35def node_create_child(self, classref, config, **kwargs):
36 return self.root.create(classref, parent=self, config=config, **kwargs)
39def node_create_child_if_configured(self, classref, config=None, **kwargs):
40 if not config:
41 return None
42 return self.root.create(classref, parent=self, config=config, **kwargs)
45def node_create_children(self, classref, configs, **kwargs):
46 if not configs:
47 return []
48 return u.compact(self.create_child(classref, cfg, **kwargs) for cfg in configs)
51def node_cfg(self, key: str, default=None):
52 val = u.get(self.config, key)
53 return val if val is not None else default
56def node_find_all(self, classref):
57 return find_all_in(self.root, self.children, classref)
60def node_find_first(self, classref):
61 return find_first_in(self.root, self.children, classref)
64def node_find_closest(self, classref):
65 node = self.parent
66 while True:
67 if not node or node is self.root:
68 return
69 if not classref or is_a(self.root, node, classref):
70 return node
71 node = node.parent
74def node_find_ancestors(self, classref):
75 ls = []
76 node = self.parent
78 while True:
79 if not node or node is self.root:
80 break
81 if not classref or is_a(self.root, node, classref):
82 ls.append(node)
83 node = node.parent
85 return ls
88def node_find_descendants(self, classref):
89 ls = []
91 def _walk(node):
92 for child_node in node.children:
93 if not classref or is_a(self.root, child_node, classref):
94 ls.append(child_node)
95 _walk(child_node)
97 _walk(self)
98 return ls
101##
104def root_init(self, specs):
105 self.specs = specs
106 self.app = None
107 self.permissions = {}
108 self.configErrors = []
109 self.configStack = []
110 self.configPaths = []
111 self.nodes = []
112 self.uidMap = {}
113 self.uidCount = 1
116def root_initialize(self, node, config):
117 self.configStack.append(node)
119 try:
120 node.initialize(config)
121 ok = True
122 except Exception as exc:
123 log.exception()
124 register_config_error(self, exc)
125 ok = False
127 self.configStack.pop()
128 return ok
131def root_post_initialize(self):
132 for node in reversed(self.nodes):
133 self.configStack = []
134 p = node
135 while p:
136 self.configStack.insert(0, p)
137 p = getattr(p, 'parent', None)
138 try:
139 super_invoke(node, 'post_configure')
140 except Exception as exc:
141 log.exception()
142 register_config_error(self, exc)
145def root_activate(self):
146 for node in self.nodes:
147 # if type(node).activate != Node.activate:
148 # log.debug(f'activate: {node!r}')
149 node.activate()
152def root_find_all(self, classref):
153 return find_all_in(self, self.nodes, classref)
156def root_find_first(self, classref):
157 return find_first_in(self, self.nodes, classref)
160def root_get(self, uid, classref):
161 if not uid:
162 return
163 node = self.uidMap.get(uid)
164 if node and (not classref or is_a(self, node, classref)):
165 return node
168def root_object_count(self) -> int:
169 return len(self.nodes)
172def root_create(self, classref, parent, config, **kwargs):
173 config = to_config(config, kwargs)
174 return create_node(self, classref, parent, config)
177def root_create_shared(self, classref, config, **kwargs):
178 config = to_config(config, kwargs)
180 uid = config.uid
181 if not uid:
182 config.uid = '_s_' + u.sha256([repr(classref), config])
184 if config.uid in self.uidMap:
185 return self.uidMap[config.uid]
187 return create_node(self, classref, None, config)
190def root_create_temporary(self, classref, config, **kwargs):
191 config = to_config(config, kwargs)
192 return create_node(self, classref, None, config, temp=True)
195def root_create_application(self, config, **kwargs):
196 config = to_config(config, kwargs)
198 node = alloc_node(self, 'gws.base.application.core.Object')
199 node.uid = '1'
200 node.parent = self
201 node.children = []
203 self.nodes.append(node)
204 self.uidMap[node.uid] = node
205 self.app = node
207 self.initialize(node, config)
209 return node
212##
214def class_name(node):
215 return node.__class__.__module__ + '.' + node.__class__.__name__
218def alloc_node(self, classref, typ=None):
219 cls = self.specs.get_class(classref, typ)
220 if not cls:
221 raise Error(f'class {classref}:{typ} not found')
223 node = cls()
224 node.root = self
225 node.extName = getattr(cls, 'extName', '')
226 node.extType = getattr(cls, 'extType', '')
228 return node
231def configure_permissions(self):
232 perms = {}
234 p = self.cfg('access')
235 if p:
236 perms[Access.read] = u.parse_acl(p)
238 p = self.cfg('permissions')
239 if p:
240 for k, v in vars(p).items():
241 a = u.parse_acl(v)
242 if a:
243 if k == 'all':
244 perms[Access.read] = perms[Access.write] = perms[Access.create] = perms[Access.delete] = a
245 elif k == 'edit':
246 perms[Access.write] = perms[Access.create] = perms[Access.delete] = a
247 else:
248 perms[k] = a
250 return perms
253def create_node(self, classref, parent, config, temp=False):
254 node = alloc_node(self, classref, config.get('type'))
255 node.uid = get_or_generate_uid(self, config)
256 node.parent = parent
257 node.children = []
259 log.debug('configure: ' + ('.' * 4 * len(self.configStack)) + f'{node!r} IN {parent or self !r}')
260 ok = self.initialize(node, config)
261 if not ok:
262 log.debug(f'FAILED {node!r}')
263 return
265 if not temp:
266 self.nodes.append(node)
267 self.uidMap[node.uid] = node
269 if parent:
270 parent.children.append(node)
272 return node
275def find_all_in(root, nodes, classref):
276 if not classref:
277 return nodes
278 cls, name, ext_name = root.specs.parse_classref(classref)
279 if cls:
280 return [node for node in nodes if isinstance(node, cls)]
281 if name:
282 return [node for node in nodes if class_name(node) == name]
283 if ext_name:
284 return [node for node in nodes if node.extName.startswith(ext_name)]
287def find_first_in(root, nodes, classref):
288 found = find_all_in(root, nodes, classref)
289 return found[0] if found else None
292def get_or_generate_uid(self, config):
293 if config.get('uid'):
294 return config.get('uid')
295 self.uidCount += 1
296 return str(self.uidCount)
299def is_a(root, node, classref):
300 cls, name, ext_name = root.specs.parse_classref(classref)
301 if cls:
302 return isinstance(node, cls)
303 if name:
304 return class_name(node) == name
305 if ext_name:
306 return node.extName.startswith(ext_name)
307 return False
310def props_of(node, user, *context):
311 if not user.can_use(node, *context):
312 return None
313 p = make_props2(node, user)
314 if p is None or isinstance(p, Data):
315 return p
316 if isinstance(p, dict):
317 return Props(p)
318 raise Error('invalid props type')
321def make_props2(obj, user):
322 if u.is_atom(obj):
323 return obj
325 if isinstance(obj, Object):
326 if user.acl_bit(Access.read, obj) == c.DENY:
327 return None
328 obj = obj.props(user)
330 if isinstance(obj, Data):
331 obj = vars(obj)
333 if u.is_dict(obj):
334 return u.compact({k: make_props2(v, user) for k, v in obj.items()})
336 if u.is_list(obj):
337 return u.compact([make_props2(v, user) for v in obj])
339 return None
342def register_config_error(self, exc):
343 lines = ['in ' + repr(val) for val in reversed(self.configStack)]
344 err = ConfigurationError(repr(exc), *lines)
345 err.__cause__ = exc
346 self.configErrors.append(err)
349def super_invoke(node, method):
350 # since `super().configure` is mandatory in `configure` methods,
351 # let's automate this by collecting all super 'configure' methods
353 mro = []
355 for cls in type(node).mro():
356 try:
357 if method in vars(cls):
358 mro.append(cls)
359 except TypeError:
360 pass
362 for cls in reversed(mro):
363 getattr(cls, method)(node)
366def to_config(config, defaults):
367 return u.merge(Data(), defaults, config)