Coverage for gws-app / gws / spec / runtime.py: 70%
143 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1"""Validate values according to specs"""
3from typing import Optional
5import re
7import gws
8import gws.lib.jsonx
9import gws.lib.importer
11from . import core, reader
12from .generator import main as generator_main
14Error = core.Error
15ReadError = core.ReadError
16GeneratorError = core.GeneratorError
17LoadError = core.LoadError
20def create(manifest_path: str = '', read_cache=False, write_cache=False) -> 'Object':
21 sd = _get_specs(manifest_path, read_cache, write_cache)
22 return Object(sd)
25def _get_specs(manifest_path: str = '', read_cache=False, write_cache=False) -> core.SpecData:
26 cache_path = gws.u.ensure_dir(gws.c.SPEC_DIR) + '/spec_' + gws.u.to_uid(manifest_path or '') + '.json'
28 if read_cache and gws.u.is_file(cache_path):
29 try:
30 specs = generator_main.from_path(cache_path)
31 gws.log.debug(f'spec.create: loaded from {cache_path!r}')
32 return specs
33 except gws.lib.jsonx.Error:
34 gws.log.exception(f'spec.create: load failed')
36 if manifest_path:
37 gws.log.debug(f'spec.create: using manifest {manifest_path!r}...')
39 gws.debug.time_start('SPEC GENERATOR')
40 specs = generator_main.generate(manifest_path=manifest_path)
41 gws.debug.time_end()
43 if write_cache:
44 try:
45 generator_main.to_path(specs, cache_path)
46 gws.log.debug(f'spec.create: stored to {cache_path!r}')
47 except gws.lib.jsonx.Error:
48 gws.log.exception(f'spec.create: store failed')
50 return specs
53##
56class Object(gws.SpecRuntime):
57 def __init__(self, sd: core.SpecData):
58 self.sd = sd
59 self.manifest = gws.ApplicationManifest(sd.meta['manifest'])
60 self.manifestPath = sd.meta['manifestPath']
61 self.version = sd.meta['version']
63 self.serverTypes = sd.serverTypes
64 self.serverTypesDict = {}
65 self.rawCommands = set()
67 for typ in self.serverTypes:
68 self.serverTypesDict[typ.uid] = typ
69 if typ.extName:
70 self.serverTypesDict[typ.extName] = typ
71 if typ.extName.startswith(core.v.EXT_COMMAND_PREFIX + str(gws.CommandCategory.raw) + '.'):
72 self.rawCommands.add(typ.extName.split('.')[-1])
74 self.strings = sd.strings
75 self.chunks = sd.chunks
77 self.appBundlePaths = []
78 for chunk in self.chunks:
79 path = chunk.bundleDir + '/' + gws.c.JS_BUNDLE
80 if path not in self.appBundlePaths:
81 self.appBundlePaths.append(path)
83 self._descCache = {}
85 def __getstate__(self):
86 self._descCache = {}
87 return vars(self)
89 def get_type(self, key):
90 return self.serverTypesDict.get(key)
92 def read(self, value, type_name, path='', options=None):
93 r = reader.Reader(self, path, options)
94 return r.read(value, type_name)
96 def object_descriptor(self, name):
97 if name in self._descCache:
98 return self._descCache[name]
100 typ = self.get_type(name)
101 if not typ:
102 return
104 self._descCache[name] = gws.ExtObjectDescriptor(
105 extName=typ.extName,
106 extType=typ.extName.split('.').pop(),
107 ident=typ.ident,
108 modName=typ.modName,
109 modPath=typ.modPath,
110 classPtr=None,
111 )
113 return self._descCache[name]
115 def register_object(self, classref, ext_type, cls):
116 _, _, ext_name = self.parse_classref(classref)
117 if not ext_name:
118 raise Error(f'invalid class reference {classref!r}')
119 ext_name += '.' + ext_type
120 setattr(cls, 'extName', ext_name)
121 setattr(cls, 'extType', ext_type)
122 self._descCache[ext_name] = gws.ExtObjectDescriptor(
123 extName=ext_name,
124 extType=ext_type,
125 ident=cls.__name__,
126 modName='',
127 modPath='',
128 classPtr=cls,
129 )
131 def get_class(self, classref, ext_type=None):
132 cls, real_name, ext_name = self.parse_classref(classref)
133 if cls:
134 return cls
136 desc = None
137 if real_name:
138 desc = self.object_descriptor(real_name)
139 elif ext_name:
140 desc = self.object_descriptor(ext_name + '.' + (ext_type or core.v.DEFAULT_VARIANT_TAG))
141 if not desc:
142 return
144 if not desc.classPtr:
145 try:
146 mod = gws.lib.importer.import_from_path(desc.modPath, gws.c.APP_DIR)
147 except gws.lib.importer.Error as exc:
148 raise LoadError(f'cannot load class {classref!r} from {desc.modPath!r}') from exc
149 desc.classPtr = getattr(mod, desc.ident)
150 setattr(desc.classPtr, 'extName', desc.extName)
151 setattr(desc.classPtr, 'extType', desc.extType)
153 return desc.classPtr
155 def command_descriptor(self, command_category, command_name):
156 if command_name in self.rawCommands and command_category != gws.CommandCategory.raw:
157 command_category = gws.CommandCategory.raw
159 name = core.v.EXT_COMMAND_PREFIX + str(command_category) + '.' + command_name
161 if name in self._descCache:
162 return self._descCache[name]
164 typ = self.get_type(name)
166 if not typ:
167 return
169 return gws.ExtCommandDescriptor(
170 extName=typ.extName,
171 extType=typ.extName.split('.').pop(),
172 extCommandCategory=command_category,
173 tArg=typ.tArg,
174 tOwner=typ.tOwner,
175 owner=self.object_descriptor(typ.tOwner),
176 methodName=typ.ident,
177 )
179 def cli_commands(self, lang='en'):
180 strings = self.strings.get(lang) or self.strings['en']
181 cmds = []
183 for typ in self.serverTypes:
184 if not typ.extName.startswith(core.v.EXT_COMMAND_CLI_PREFIX):
185 continue
187 # e.g "gws.ext.command.cli.serverStart" -> [server, start]
188 m = re.search(r'\.([a-z]+)(\w+)$', typ.extName)
189 if not m:
190 continue
191 cmd1 = m.group(1)
192 cmd2 = m.group(2).lower()
194 args = []
195 arg_typ = self.get_type(typ.tArg)
196 if arg_typ:
197 for name, prop_type_uid in arg_typ.tProperties.items():
198 prop_typ = self.get_type(prop_type_uid)
199 if not prop_typ:
200 continue
201 args.append(
202 gws.Data(
203 name=name,
204 type=prop_typ.tValue,
205 doc=strings.get(prop_type_uid) or self.strings['en'].get(prop_type_uid) or '',
206 defaultValue=prop_typ.defaultValue,
207 hasDefault=prop_typ.hasDefault,
208 )
209 )
211 entry = gws.Data(
212 cmd1=cmd1,
213 cmd2=cmd2,
214 doc=strings.get(typ.uid) or self.strings['en'].get(typ.uid) or '',
215 args=sorted(args, key=lambda a: a.name),
216 )
217 cmds.append(entry)
219 return sorted(cmds, key=lambda c: (c.cmd1, c.cmd2))
221 def parse_classref(self, classref: gws.ClassRef) -> tuple[Optional[type], str, str]:
222 ext_name = gws.ext.name_for(classref)
223 if ext_name:
224 return None, '', ext_name
226 if isinstance(classref, str):
227 return None, classref, ''
229 if isinstance(classref, type):
230 return classref, '', ''
232 raise Error(f'invalid class reference {classref!r}')