Coverage for gws-app / gws / spec / runtime.py: 70%

143 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1"""Validate values according to specs""" 

2 

3from typing import Optional 

4 

5import re 

6 

7import gws 

8import gws.lib.jsonx 

9import gws.lib.importer 

10 

11from . import core, reader 

12from .generator import main as generator_main 

13 

14Error = core.Error 

15ReadError = core.ReadError 

16GeneratorError = core.GeneratorError 

17LoadError = core.LoadError 

18 

19 

20def create(manifest_path: str = '', read_cache=False, write_cache=False) -> 'Object': 

21 sd = _get_specs(manifest_path, read_cache, write_cache) 

22 return Object(sd) 

23 

24 

25def _get_specs(manifest_path: str = '', read_cache=False, write_cache=False) -> core.SpecData: 

26 cache_path = gws.u.ensure_dir(gws.c.SPEC_DIR) + '/spec_' + gws.u.to_uid(manifest_path or '') + '.json' 

27 

28 if read_cache and gws.u.is_file(cache_path): 

29 try: 

30 specs = generator_main.from_path(cache_path) 

31 gws.log.debug(f'spec.create: loaded from {cache_path!r}') 

32 return specs 

33 except gws.lib.jsonx.Error: 

34 gws.log.exception(f'spec.create: load failed') 

35 

36 if manifest_path: 

37 gws.log.debug(f'spec.create: using manifest {manifest_path!r}...') 

38 

39 gws.debug.time_start('SPEC GENERATOR') 

40 specs = generator_main.generate(manifest_path=manifest_path) 

41 gws.debug.time_end() 

42 

43 if write_cache: 

44 try: 

45 generator_main.to_path(specs, cache_path) 

46 gws.log.debug(f'spec.create: stored to {cache_path!r}') 

47 except gws.lib.jsonx.Error: 

48 gws.log.exception(f'spec.create: store failed') 

49 

50 return specs 

51 

52 

53## 

54 

55 

56class Object(gws.SpecRuntime): 

57 def __init__(self, sd: core.SpecData): 

58 self.sd = sd 

59 self.manifest = gws.ApplicationManifest(sd.meta['manifest']) 

60 self.manifestPath = sd.meta['manifestPath'] 

61 self.version = sd.meta['version'] 

62 

63 self.serverTypes = sd.serverTypes 

64 self.serverTypesDict = {} 

65 self.rawCommands = set() 

66 

67 for typ in self.serverTypes: 

68 self.serverTypesDict[typ.uid] = typ 

69 if typ.extName: 

70 self.serverTypesDict[typ.extName] = typ 

71 if typ.extName.startswith(core.v.EXT_COMMAND_PREFIX + str(gws.CommandCategory.raw) + '.'): 

72 self.rawCommands.add(typ.extName.split('.')[-1]) 

73 

74 self.strings = sd.strings 

75 self.chunks = sd.chunks 

76 

77 self.appBundlePaths = [] 

78 for chunk in self.chunks: 

79 path = chunk.bundleDir + '/' + gws.c.JS_BUNDLE 

80 if path not in self.appBundlePaths: 

81 self.appBundlePaths.append(path) 

82 

83 self._descCache = {} 

84 

85 def __getstate__(self): 

86 self._descCache = {} 

87 return vars(self) 

88 

89 def get_type(self, key): 

90 return self.serverTypesDict.get(key) 

91 

92 def read(self, value, type_name, path='', options=None): 

93 r = reader.Reader(self, path, options) 

94 return r.read(value, type_name) 

95 

96 def object_descriptor(self, name): 

97 if name in self._descCache: 

98 return self._descCache[name] 

99 

100 typ = self.get_type(name) 

101 if not typ: 

102 return 

103 

104 self._descCache[name] = gws.ExtObjectDescriptor( 

105 extName=typ.extName, 

106 extType=typ.extName.split('.').pop(), 

107 ident=typ.ident, 

108 modName=typ.modName, 

109 modPath=typ.modPath, 

110 classPtr=None, 

111 ) 

112 

113 return self._descCache[name] 

114 

115 def register_object(self, classref, ext_type, cls): 

116 _, _, ext_name = self.parse_classref(classref) 

117 if not ext_name: 

118 raise Error(f'invalid class reference {classref!r}') 

119 ext_name += '.' + ext_type 

120 setattr(cls, 'extName', ext_name) 

121 setattr(cls, 'extType', ext_type) 

122 self._descCache[ext_name] = gws.ExtObjectDescriptor( 

123 extName=ext_name, 

124 extType=ext_type, 

125 ident=cls.__name__, 

126 modName='', 

127 modPath='', 

128 classPtr=cls, 

129 ) 

130 

131 def get_class(self, classref, ext_type=None): 

132 cls, real_name, ext_name = self.parse_classref(classref) 

133 if cls: 

134 return cls 

135 

136 desc = None 

137 if real_name: 

138 desc = self.object_descriptor(real_name) 

139 elif ext_name: 

140 desc = self.object_descriptor(ext_name + '.' + (ext_type or core.v.DEFAULT_VARIANT_TAG)) 

141 if not desc: 

142 return 

143 

144 if not desc.classPtr: 

145 try: 

146 mod = gws.lib.importer.import_from_path(desc.modPath, gws.c.APP_DIR) 

147 except gws.lib.importer.Error as exc: 

148 raise LoadError(f'cannot load class {classref!r} from {desc.modPath!r}') from exc 

149 desc.classPtr = getattr(mod, desc.ident) 

150 setattr(desc.classPtr, 'extName', desc.extName) 

151 setattr(desc.classPtr, 'extType', desc.extType) 

152 

153 return desc.classPtr 

154 

155 def command_descriptor(self, command_category, command_name): 

156 if command_name in self.rawCommands and command_category != gws.CommandCategory.raw: 

157 command_category = gws.CommandCategory.raw 

158 

159 name = core.v.EXT_COMMAND_PREFIX + str(command_category) + '.' + command_name 

160 

161 if name in self._descCache: 

162 return self._descCache[name] 

163 

164 typ = self.get_type(name) 

165 

166 if not typ: 

167 return 

168 

169 return gws.ExtCommandDescriptor( 

170 extName=typ.extName, 

171 extType=typ.extName.split('.').pop(), 

172 extCommandCategory=command_category, 

173 tArg=typ.tArg, 

174 tOwner=typ.tOwner, 

175 owner=self.object_descriptor(typ.tOwner), 

176 methodName=typ.ident, 

177 ) 

178 

179 def cli_commands(self, lang='en'): 

180 strings = self.strings.get(lang) or self.strings['en'] 

181 cmds = [] 

182 

183 for typ in self.serverTypes: 

184 if not typ.extName.startswith(core.v.EXT_COMMAND_CLI_PREFIX): 

185 continue 

186 

187 # e.g "gws.ext.command.cli.serverStart" -> [server, start] 

188 m = re.search(r'\.([a-z]+)(\w+)$', typ.extName) 

189 if not m: 

190 continue 

191 cmd1 = m.group(1) 

192 cmd2 = m.group(2).lower() 

193 

194 args = [] 

195 arg_typ = self.get_type(typ.tArg) 

196 if arg_typ: 

197 for name, prop_type_uid in arg_typ.tProperties.items(): 

198 prop_typ = self.get_type(prop_type_uid) 

199 if not prop_typ: 

200 continue 

201 args.append( 

202 gws.Data( 

203 name=name, 

204 type=prop_typ.tValue, 

205 doc=strings.get(prop_type_uid) or self.strings['en'].get(prop_type_uid) or '', 

206 defaultValue=prop_typ.defaultValue, 

207 hasDefault=prop_typ.hasDefault, 

208 ) 

209 ) 

210 

211 entry = gws.Data( 

212 cmd1=cmd1, 

213 cmd2=cmd2, 

214 doc=strings.get(typ.uid) or self.strings['en'].get(typ.uid) or '', 

215 args=sorted(args, key=lambda a: a.name), 

216 ) 

217 cmds.append(entry) 

218 

219 return sorted(cmds, key=lambda c: (c.cmd1, c.cmd2)) 

220 

221 def parse_classref(self, classref: gws.ClassRef) -> tuple[Optional[type], str, str]: 

222 ext_name = gws.ext.name_for(classref) 

223 if ext_name: 

224 return None, '', ext_name 

225 

226 if isinstance(classref, str): 

227 return None, classref, '' 

228 

229 if isinstance(classref, type): 

230 return classref, '', '' 

231 

232 raise Error(f'invalid class reference {classref!r}')