Coverage for gws-app/gws/core/tree_impl.py: 88%

248 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 23:09 +0200

1"""Internal implementations of ``gws.Node`` and ``gws.Root`` methods.""" 

2 

3from . import ( 

4 const as c, 

5 util as u, 

6 log, 

7) 

8 

9Access = None 

10Error = None 

11ConfigurationError = None 

12Data = None 

13Props = None 

14Object = None 

15 

16 

17def object_repr(self): 

18 r = getattr(self, 'extName', None) or class_name(self) 

19 s = getattr(self, 'title', None) 

20 if s: 

21 r += f' title={s!r}' 

22 s = getattr(self, 'uid', None) 

23 if s: 

24 r += f' uid={s}' 

25 return '<' + r + ' ' + hex(id(self)) + '>' 

26 

27 

28def node_initialize(self, config): 

29 self.config = config 

30 self.permissions = configure_permissions(self) 

31 super_invoke(self, 'pre_configure') 

32 super_invoke(self, 'configure') 

33 

34 

35def node_create_child(self, classref, config, **kwargs): 

36 return self.root.create(classref, parent=self, config=config, **kwargs) 

37 

38 

39def node_create_child_if_configured(self, classref, config=None, **kwargs): 

40 if not config: 

41 return None 

42 return self.root.create(classref, parent=self, config=config, **kwargs) 

43 

44 

45def node_create_children(self, classref, configs, **kwargs): 

46 if not configs: 

47 return [] 

48 return u.compact(self.create_child(classref, cfg, **kwargs) for cfg in configs) 

49 

50 

51def node_cfg(self, key: str, default=None): 

52 val = u.get(self.config, key) 

53 return val if val is not None else default 

54 

55 

56def node_find_all(self, classref): 

57 return find_all_in(self.root, self.children, classref) 

58 

59 

60def node_find_first(self, classref): 

61 return find_first_in(self.root, self.children, classref) 

62 

63 

64def node_find_closest(self, classref): 

65 node = self.parent 

66 while True: 

67 if not node or node is self.root: 

68 return 

69 if not classref or is_a(self.root, node, classref): 

70 return node 

71 node = node.parent 

72 

73 

74def node_find_ancestors(self, classref): 

75 ls = [] 

76 node = self.parent 

77 

78 while True: 

79 if not node or node is self.root: 

80 break 

81 if not classref or is_a(self.root, node, classref): 

82 ls.append(node) 

83 node = node.parent 

84 

85 return ls 

86 

87 

88def node_find_descendants(self, classref): 

89 ls = [] 

90 

91 def _walk(node): 

92 for child_node in node.children: 

93 if not classref or is_a(self.root, child_node, classref): 

94 ls.append(child_node) 

95 _walk(child_node) 

96 

97 _walk(self) 

98 return ls 

99 

100 

101## 

102 

103 

104def root_init(self, specs): 

105 self.specs = specs 

106 self.app = None 

107 self.permissions = {} 

108 self.configErrors = [] 

109 self.configStack = [] 

110 self.configPaths = [] 

111 self.nodes = [] 

112 self.uidMap = {} 

113 self.uidCount = 1 

114 

115 

116def root_initialize(self, node, config): 

117 self.configStack.append(node) 

118 

119 try: 

120 node.initialize(config) 

121 ok = True 

122 except Exception as exc: 

123 log.exception() 

124 register_config_error(self, exc) 

125 ok = False 

126 

127 self.configStack.pop() 

128 return ok 

129 

130 

131def root_post_initialize(self): 

132 for node in reversed(self.nodes): 

133 self.configStack = [] 

134 p = node 

135 while p: 

136 self.configStack.insert(0, p) 

137 p = getattr(p, 'parent', None) 

138 try: 

139 super_invoke(node, 'post_configure') 

140 except Exception as exc: 

141 log.exception() 

142 register_config_error(self, exc) 

143 

144 

145def root_activate(self): 

146 for node in self.nodes: 

147 # if type(node).activate != Node.activate: 

148 # log.debug(f'activate: {node!r}') 

149 node.activate() 

150 

151 

152def root_find_all(self, classref): 

153 return find_all_in(self, self.nodes, classref) 

154 

155 

156def root_find_first(self, classref): 

157 return find_first_in(self, self.nodes, classref) 

158 

159 

160def root_get(self, uid, classref): 

161 if not uid: 

162 return 

163 node = self.uidMap.get(uid) 

164 if node and (not classref or is_a(self, node, classref)): 

165 return node 

166 

167 

168def root_object_count(self) -> int: 

169 return len(self.nodes) 

170 

171 

172def root_create(self, classref, parent, config, **kwargs): 

173 config = to_config(config, kwargs) 

174 return create_node(self, classref, parent, config) 

175 

176 

177def root_create_shared(self, classref, config, **kwargs): 

178 config = to_config(config, kwargs) 

179 

180 uid = config.uid 

181 if not uid: 

182 config.uid = '_s_' + u.sha256([repr(classref), config]) 

183 

184 if config.uid in self.uidMap: 

185 return self.uidMap[config.uid] 

186 

187 return create_node(self, classref, None, config) 

188 

189 

190def root_create_temporary(self, classref, config, **kwargs): 

191 config = to_config(config, kwargs) 

192 return create_node(self, classref, None, config, temp=True) 

193 

194 

195def root_create_application(self, config, **kwargs): 

196 config = to_config(config, kwargs) 

197 

198 node = alloc_node(self, 'gws.base.application.core.Object') 

199 node.uid = '1' 

200 node.parent = self 

201 node.children = [] 

202 

203 self.nodes.append(node) 

204 self.uidMap[node.uid] = node 

205 self.app = node 

206 

207 self.initialize(node, config) 

208 

209 return node 

210 

211 

212## 

213 

214def class_name(node): 

215 return node.__class__.__module__ + '.' + node.__class__.__name__ 

216 

217 

218def alloc_node(self, classref, typ=None): 

219 cls = self.specs.get_class(classref, typ) 

220 if not cls: 

221 raise Error(f'class {classref}:{typ} not found') 

222 

223 node = cls() 

224 node.root = self 

225 node.extName = getattr(cls, 'extName', '') 

226 node.extType = getattr(cls, 'extType', '') 

227 

228 return node 

229 

230 

231def configure_permissions(self): 

232 perms = {} 

233 

234 p = self.cfg('access') 

235 if p: 

236 perms[Access.read] = u.parse_acl(p) 

237 

238 p = self.cfg('permissions') 

239 if p: 

240 for k, v in vars(p).items(): 

241 a = u.parse_acl(v) 

242 if a: 

243 if k == 'all': 

244 perms[Access.read] = perms[Access.write] = perms[Access.create] = perms[Access.delete] = a 

245 elif k == 'edit': 

246 perms[Access.write] = perms[Access.create] = perms[Access.delete] = a 

247 else: 

248 perms[k] = a 

249 

250 return perms 

251 

252 

253def create_node(self, classref, parent, config, temp=False): 

254 node = alloc_node(self, classref, config.get('type')) 

255 node.uid = get_or_generate_uid(self, config) 

256 node.parent = parent 

257 node.children = [] 

258 

259 log.debug('configure: ' + ('.' * 4 * len(self.configStack)) + f'{node!r} IN {parent or self !r}') 

260 ok = self.initialize(node, config) 

261 if not ok: 

262 log.debug(f'FAILED {node!r}') 

263 return 

264 

265 if not temp: 

266 self.nodes.append(node) 

267 self.uidMap[node.uid] = node 

268 

269 if parent: 

270 parent.children.append(node) 

271 

272 return node 

273 

274 

275def find_all_in(root, nodes, classref): 

276 if not classref: 

277 return nodes 

278 cls, name, ext_name = root.specs.parse_classref(classref) 

279 if cls: 

280 return [node for node in nodes if isinstance(node, cls)] 

281 if name: 

282 return [node for node in nodes if class_name(node) == name] 

283 if ext_name: 

284 return [node for node in nodes if node.extName.startswith(ext_name)] 

285 

286 

287def find_first_in(root, nodes, classref): 

288 found = find_all_in(root, nodes, classref) 

289 return found[0] if found else None 

290 

291 

292def get_or_generate_uid(self, config): 

293 if config.get('uid'): 

294 return config.get('uid') 

295 self.uidCount += 1 

296 return str(self.uidCount) 

297 

298 

299def is_a(root, node, classref): 

300 cls, name, ext_name = root.specs.parse_classref(classref) 

301 if cls: 

302 return isinstance(node, cls) 

303 if name: 

304 return class_name(node) == name 

305 if ext_name: 

306 return node.extName.startswith(ext_name) 

307 return False 

308 

309 

310def props_of(node, user, *context): 

311 if not user.can_use(node, *context): 

312 return None 

313 p = make_props2(node, user) 

314 if p is None or isinstance(p, Data): 

315 return p 

316 if isinstance(p, dict): 

317 return Props(p) 

318 raise Error('invalid props type') 

319 

320 

321def make_props2(obj, user): 

322 if u.is_atom(obj): 

323 return obj 

324 

325 if isinstance(obj, Object): 

326 if user.acl_bit(Access.read, obj) == c.DENY: 

327 return None 

328 obj = obj.props(user) 

329 

330 if isinstance(obj, Data): 

331 obj = vars(obj) 

332 

333 if u.is_dict(obj): 

334 return u.compact({k: make_props2(v, user) for k, v in obj.items()}) 

335 

336 if u.is_list(obj): 

337 return u.compact([make_props2(v, user) for v in obj]) 

338 

339 return None 

340 

341 

342def register_config_error(self, exc): 

343 lines = ['in ' + repr(val) for val in reversed(self.configStack)] 

344 err = ConfigurationError(repr(exc), *lines) 

345 err.__cause__ = exc 

346 self.configErrors.append(err) 

347 

348 

349def super_invoke(node, method): 

350 # since `super().configure` is mandatory in `configure` methods, 

351 # let's automate this by collecting all super 'configure' methods 

352 

353 mro = [] 

354 

355 for cls in type(node).mro(): 

356 try: 

357 if method in vars(cls): 

358 mro.append(cls) 

359 except TypeError: 

360 pass 

361 

362 for cls in reversed(mro): 

363 getattr(cls, method)(node) 

364 

365 

366def to_config(config, defaults): 

367 return u.merge(Data(), defaults, config)