Coverage for gws-app/gws/spec/runtime.py: 70%

138 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 23:09 +0200

1"""Validate values according to specs""" 

2 

3from typing import Optional 

4 

5import re 

6 

7import gws 

8import gws.lib.jsonx 

9import gws.lib.importer 

10 

11from . import core, reader 

12from .generator import main as generator_main 

13 

14Error = core.Error 

15ReadError = core.ReadError 

16GeneratorError = core.GeneratorError 

17LoadError = core.LoadError 

18 

19 

20def create(manifest_path: str = '', read_cache=False, write_cache=False) -> 'Object': 

21 sd = _get_specs(manifest_path, read_cache, write_cache) 

22 return Object(sd) 

23 

24 

25def _get_specs(manifest_path: str = '', read_cache=False, write_cache=False) -> core.SpecData: 

26 cache_path = gws.u.ensure_dir(gws.c.SPEC_DIR) + '/spec_' + gws.u.to_uid(manifest_path or '') + '.json' 

27 

28 if read_cache and gws.u.is_file(cache_path): 

29 try: 

30 specs = generator_main.from_path(cache_path) 

31 gws.log.debug(f'spec.create: loaded from {cache_path!r}') 

32 return specs 

33 except gws.lib.jsonx.Error: 

34 gws.log.exception(f'spec.create: load failed') 

35 

36 if manifest_path: 

37 gws.log.debug(f'spec.create: using manifest {manifest_path!r}...') 

38 

39 gws.debug.time_start('SPEC GENERATOR') 

40 specs = generator_main.generate(manifest_path=manifest_path) 

41 gws.debug.time_end() 

42 

43 if write_cache: 

44 try: 

45 generator_main.to_path(specs, cache_path) 

46 gws.log.debug(f'spec.create: stored to {cache_path!r}') 

47 except gws.lib.jsonx.Error: 

48 gws.log.exception(f'spec.create: store failed') 

49 

50 return specs 

51 

52 

53## 

54 

55 

56class Object(gws.SpecRuntime): 

57 def __init__(self, sd: core.SpecData): 

58 self.sd = sd 

59 self.manifest = gws.ApplicationManifest(sd.meta['manifest']) 

60 self.manifestPath = sd.meta['manifestPath'] 

61 self.version = sd.meta['version'] 

62 

63 self.serverTypes = sd.serverTypes 

64 self.serverTypesDict = {} 

65 for typ in self.serverTypes: 

66 self.serverTypesDict[typ.uid] = typ 

67 if typ.extName: 

68 self.serverTypesDict[typ.extName] = typ 

69 

70 self.strings = sd.strings 

71 self.chunks = sd.chunks 

72 

73 self.appBundlePaths = [] 

74 for chunk in self.chunks: 

75 path = chunk.bundleDir + '/' + gws.c.JS_BUNDLE 

76 if path not in self.appBundlePaths: 

77 self.appBundlePaths.append(path) 

78 

79 self._descCache = {} 

80 

81 def __getstate__(self): 

82 self._descCache = {} 

83 return vars(self) 

84 

85 def get_type(self, key): 

86 return self.serverTypesDict.get(key) 

87 

88 def read(self, value, type_name, path='', options=None): 

89 r = reader.Reader(self, path, options) 

90 return r.read(value, type_name) 

91 

92 def object_descriptor(self, name): 

93 if name in self._descCache: 

94 return self._descCache[name] 

95 

96 typ = self.get_type(name) 

97 if not typ: 

98 return 

99 

100 self._descCache[name] = gws.ExtObjectDescriptor( 

101 extName=typ.extName, 

102 extType=typ.extName.split('.').pop(), 

103 ident=typ.ident, 

104 modName=typ.modName, 

105 modPath=typ.modPath, 

106 classPtr=None 

107 ) 

108 

109 return self._descCache[name] 

110 

111 def register_object(self, classref, ext_type, cls): 

112 _, _, ext_name = self.parse_classref(classref) 

113 if not ext_name: 

114 raise Error(f'invalid class reference {classref!r}') 

115 ext_name += '.' + ext_type 

116 setattr(cls, 'extName', ext_name) 

117 setattr(cls, 'extType', ext_type) 

118 self._descCache[ext_name] = gws.ExtObjectDescriptor( 

119 extName=ext_name, 

120 extType=ext_type, 

121 ident=cls.__name__, 

122 modName='', 

123 modPath='', 

124 classPtr=cls 

125 ) 

126 

127 def get_class(self, classref, ext_type=None): 

128 cls, real_name, ext_name = self.parse_classref(classref) 

129 if cls: 

130 return cls 

131 

132 desc = None 

133 if real_name: 

134 desc = self.object_descriptor(real_name) 

135 elif ext_name: 

136 desc = self.object_descriptor(ext_name + '.' + (ext_type or core.v.DEFAULT_VARIANT_TAG)) 

137 if not desc: 

138 return 

139 

140 if not desc.classPtr: 

141 try: 

142 mod = gws.lib.importer.import_from_path(desc.modPath, gws.c.APP_DIR) 

143 except gws.lib.importer.Error as exc: 

144 raise LoadError(f'cannot load class {classref!r} from {desc.modPath!r}') from exc 

145 desc.classPtr = getattr(mod, desc.ident) 

146 setattr(desc.classPtr, 'extName', desc.extName) 

147 setattr(desc.classPtr, 'extType', desc.extType) 

148 

149 return desc.classPtr 

150 

151 def command_descriptor(self, command_category, command_name): 

152 name = core.v.EXT_COMMAND_PREFIX + str(command_category) + '.' + command_name 

153 

154 if name in self._descCache: 

155 return self._descCache[name] 

156 

157 typ = self.get_type(name) 

158 

159 if not typ: 

160 return 

161 

162 return gws.ExtCommandDescriptor( 

163 extName=typ.extName, 

164 extType=typ.extName.split('.').pop(), 

165 tArg=typ.tArg, 

166 tOwner=typ.tOwner, 

167 owner=self.object_descriptor(typ.tOwner), 

168 methodName=typ.ident, 

169 ) 

170 

171 def cli_commands(self, lang='en'): 

172 strings = self.strings.get(lang) or self.strings['en'] 

173 cmds = [] 

174 

175 for typ in self.serverTypes: 

176 if not typ.extName.startswith(core.v.EXT_COMMAND_CLI_PREFIX): 

177 continue 

178 

179 # e.g "gws.ext.command.cli.serverStart" -> [server, start] 

180 m = re.search(r'\.([a-z]+)(\w+)$', typ.extName) 

181 if not m: 

182 continue 

183 cmd1 = m.group(1) 

184 cmd2 = m.group(2).lower() 

185 

186 args = [] 

187 arg_typ = self.get_type(typ.tArg) 

188 if arg_typ: 

189 for name, prop_type_uid in arg_typ.tProperties.items(): 

190 prop_typ = self.get_type(prop_type_uid) 

191 if not prop_typ: 

192 continue 

193 args.append(gws.Data( 

194 name=name, 

195 type=prop_typ.tValue, 

196 doc=strings.get(prop_type_uid) or self.strings['en'].get(prop_type_uid) or '', 

197 defaultValue=prop_typ.defaultValue, 

198 hasDefault=prop_typ.hasDefault, 

199 )) 

200 

201 entry = gws.Data( 

202 cmd1=cmd1, 

203 cmd2=cmd2, 

204 doc=strings.get(typ.uid) or self.strings['en'].get(typ.uid) or '', 

205 args=sorted(args, key=lambda a: a.name), 

206 ) 

207 cmds.append(entry) 

208 

209 return sorted(cmds, key=lambda c: (c.cmd1, c.cmd2)) 

210 

211 def parse_classref(self, classref: gws.ClassRef) -> tuple[Optional[type], str, str]: 

212 ext_name = gws.ext.name_for(classref) 

213 if ext_name: 

214 return None, '', ext_name 

215 

216 if isinstance(classref, str): 

217 return None, classref, '' 

218 

219 if isinstance(classref, type): 

220 return classref, '', '' 

221 

222 raise Error(f'invalid class reference {classref!r}')