Coverage for gws-app/gws/config/parser.py: 17%
218 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
1"""Configuration parser.
3Convert configuration files (in different formats) or row config dicts
4into ``gws.Config`` objects by validating them against the specs.
5"""
7from typing import Optional, cast
9import os
10import yaml
12import gws
13import gws.lib.jsonx
14import gws.lib.osx
15import gws.lib.datetimex
16import gws.lib.importer
17import gws.lib.vendor.jump
18import gws.lib.vendor.slon
19import gws.spec.runtime
21CONFIG_PATH_PATTERN = r'\.(py|json|yaml|cx)$'
24def parse_from_path(path: str, as_type: str, ctx: gws.ConfigContext) -> Optional[gws.Config]:
25 """Parse a configuration from a path.
27 Args:
28 path: Path to the configuration file.
29 as_type: Type of the configuration (e.g., 'gws.base.application.core.Config').
30 ctx: Configuration context.
31 """
33 pp = _Parser(ctx)
34 val = pp.read_from_path(path)
35 d = pp.ensure_dict(val, path)
36 return pp.parse_dict(d, path, as_type) if d else None
39def parse_dict(dct: dict | gws.Data, path: str, as_type: str, ctx: gws.ConfigContext) -> Optional[gws.Config]:
40 """Parse a configuration given as python dict.
42 Args:
43 dct: Dictionary containing the configuration.
44 path: Path to the configuration file (for error reporting).
45 as_type: Type of the configuration.
46 ctx: Configuration context.
47 """
49 pp = _Parser(ctx)
50 d = pp.ensure_dict(dct, path)
51 return pp.parse_dict(d, path, as_type) if d else None
54def parse_app_from_path(path: str, ctx: gws.ConfigContext) -> Optional[gws.Config]:
55 """Parse application configuration from a path.
57 Args:
58 path: Path to the application configuration file.
59 ctx: Configuration context.
60 """
62 pp = _Parser(ctx)
63 val = pp.read_from_path(path)
64 d = pp.ensure_dict(val, path)
65 return _parse_app_dict(d, path, pp) if d else None
68def parse_app_dict(dct: dict | gws.Data, path: str, ctx: gws.ConfigContext) -> Optional[gws.Config]:
69 """Parse application configuration given as python dict.
71 Args:
72 dct: Dictionary containing the application configuration.
73 path: Path to the configuration file (for error reporting).
74 ctx: Configuration context.
75 """
77 pp = _Parser(ctx)
78 d = pp.ensure_dict(dct, path)
79 return _parse_app_dict(d, path, pp) if d else None
82def read_from_path(path: str, ctx: gws.ConfigContext) -> Optional[dict]:
83 """Read a configuration file from a path, parse config formats.
85 Args:
86 path: Path to the configuration file.
87 ctx: Configuration context.
88 """
89 pp = _Parser(ctx)
90 val = pp.read_from_path(path)
91 d = pp.ensure_dict(val, path)
92 return d
95##
98def _parse_app_dict(dct: dict, path, pp: '_Parser'):
99 dct = gws.u.to_dict(dct)
100 if not isinstance(dct, dict):
101 _register_error(pp.ctx, f'app config must be a dict: {path!r}')
102 return
104 # the timezone must be set before everything else
105 tz = dct.get('server', {}).get('timeZone', '')
106 if tz:
107 gws.lib.datetimex.set_local_time_zone(tz)
108 gws.log.info(f'local time zone="{gws.lib.datetimex.time_zone()}"')
110 # remove 'projects' from the config, parse them later on
111 inline_projects = dct.pop('projects', [])
113 gws.log.info('parsing main configuration...')
114 app_cfg = pp.parse_dict(dct, path, as_type='gws.base.application.core.Config')
115 if not app_cfg:
116 return
118 projects = []
119 for dcts in inline_projects:
120 projects.extend(_parse_projects(dcts, path, pp))
122 project_paths = list(app_cfg.get('projectPaths') or [])
123 project_dirs = list(app_cfg.get('projectDirs') or [])
125 all_project_paths = list(project_paths)
126 for dirname in project_dirs:
127 all_project_paths.extend(gws.lib.osx.find_files(dirname, CONFIG_PATH_PATTERN, deep=True))
129 for pth in sorted(set(all_project_paths)):
130 projects.extend(_parse_projects_from_path(pth, pp))
132 app_cfg.set('projectPaths', project_paths)
133 app_cfg.set('projectDirs', project_dirs)
134 app_cfg.set('projects', projects)
136 _save_debug(app_cfg, path, '.parsed.json')
137 return app_cfg
140def _parse_projects_from_path(path, pp: '_Parser'):
141 cfg_list = pp.read_from_path(path)
142 if not cfg_list:
143 return []
144 return _parse_projects(cfg_list, path, pp)
147def _parse_projects(cfg_list, path, pp: '_Parser'):
148 ps = []
150 for c in _as_flat_list(cfg_list):
151 d = pp.ensure_dict(c, path)
152 if not d:
153 continue
154 prj_cfg = pp.parse_dict(d, path, 'gws.ext.config.project')
155 if prj_cfg:
156 ps.append(prj_cfg)
158 return ps
161##
164class _Parser:
165 def __init__(self, ctx: gws.ConfigContext):
166 self.ctx = ctx
167 self.ctx.errors = ctx.errors or []
168 self.ctx.paths = ctx.paths or set()
169 self.ctx.readOptions = ctx.readOptions or set()
170 self.ctx.readOptions.add(gws.SpecReadOption.verboseErrors)
172 def ensure_dict(self, val, path):
173 if val is None:
174 return
175 d = _to_plain(val)
176 if not isinstance(d, dict):
177 _register_error(self.ctx, f'unsupported configuration type: {type(val)!r}', path)
178 return
179 return d
181 def parse_dict(self, dct: dict, path: str, as_type: str) -> Optional[gws.Config]:
182 if not isinstance(dct, dict):
183 _register_error(self.ctx, 'unsupported configuration', path)
184 return
185 if path:
186 _register_path(self.ctx, path)
187 try:
188 cfg = self.ctx.specs.read(
189 dct,
190 as_type,
191 path=path,
192 options=self.ctx.readOptions,
193 )
194 return cast(gws.Config, cfg)
195 except gws.spec.runtime.ReadError as exc:
196 message, _, details = exc.args
197 lines = []
198 pp = details.get('path')
199 if pp:
200 lines.append(f'PATH: {pp!r}')
201 pp = details.get('formatted_value')
202 if pp:
203 lines.append(f'VALUE: {pp}')
204 pp = details.get('formatted_stack')
205 if pp:
206 lines.extend(pp)
207 _register_error(self.ctx, f'parse error: {message}', *lines, cause=exc)
209 def read_from_path(self, path: str):
210 if not os.path.isfile(path):
211 _register_error(self.ctx, f'file not found: {path!r}')
212 return
214 _register_path(self.ctx, path)
215 r = self.read2(path)
217 if r:
218 r = _to_plain(r)
219 _save_debug(r, path, '.src.json')
220 return r
222 def read2(self, path: str):
223 if path.endswith('.py'):
224 return self.read_py(path)
225 if path.endswith('.json'):
226 return self.read_json(path)
227 if path.endswith('.yml'):
228 return self.read_yaml(path)
229 if path.endswith('.cx'):
230 return self.read_cx(path)
232 _register_error(self.ctx, 'unsupported configuration', path)
234 def read_py(self, path: str):
235 try:
236 fn = gws.lib.importer.load_file(path).get('main')
237 if not fn:
238 _register_error(self.ctx, f'no "main" function found in {path!r}')
239 return
240 return fn(self.ctx)
241 except Exception as exc:
242 gws.log.exception()
243 _register_error(self.ctx, f'python error: {exc}', cause=exc)
245 def read_json(self, path: str):
246 try:
247 return gws.lib.jsonx.from_path(path)
248 except Exception as exc:
249 _register_error(self.ctx, 'json error', cause=exc)
251 def read_yaml(self, path: str):
252 try:
253 with open(path, encoding='utf8') as fp:
254 return yaml.safe_load(fp)
255 except Exception as exc:
256 _register_error(self.ctx, 'yaml error', cause=exc)
258 def read_cx(self, path: str):
259 err_cnt = [0]
261 def _error_handler(exc, path, line, env):
262 _register_syntax_error(self.ctx, path, gws.u.read_file(path), repr(exc), line)
263 err_cnt[0] += 1
264 return True
266 def _loader(cur_path, load_path):
267 if not os.path.isabs(load_path):
268 load_path = os.path.abspath(os.path.dirname(cur_path) + '/' + load_path)
269 _register_path(self.ctx, load_path)
270 return gws.u.read_file(load_path), load_path
272 try:
273 tpl = gws.lib.vendor.jump.compile_path(path, loader=_loader)
274 except gws.lib.vendor.jump.CompileError as exc:
275 _register_syntax_error(self.ctx, path, gws.u.read_file(exc.path), exc.message, exc.line, cause=exc)
276 return
278 args = args = {
279 'true': True,
280 'false': False,
281 'ctx': self.ctx,
282 'gws': gws,
283 }
285 slon = gws.lib.vendor.jump.call(tpl, args, error=_error_handler)
286 if err_cnt[0] > 0:
287 return
289 _save_debug(slon, path, '.src.slon')
291 try:
292 return gws.lib.vendor.slon.loads(slon, as_object=True)
293 except gws.lib.vendor.slon.SlonError as exc:
294 _register_syntax_error(self.ctx, path, slon, exc.args[0], exc.args[2], cause=exc)
297##
300def _register_path(ctx, path):
301 ctx.paths.add(path)
304def _register_error(ctx, message, *args, cause=None):
305 err = gws.ConfigurationError(message, *args)
306 if cause:
307 err.__cause__ = cause
308 ctx.errors.append(err)
311def _register_syntax_error(ctx, path, src, message, line, context=10, cause=None):
312 lines = [f'PATH: {path!r}']
314 for n, ln in enumerate(src.splitlines(), 1):
315 if n < line - context:
316 continue
317 if n > line + context:
318 break
319 ln = f'{n}: {ln}'
320 if n == line:
321 ln = f'>>> {ln}'
322 lines.append(ln)
324 _register_error(ctx, f'syntax error: {message}', *lines, cause=cause)
327def _save_debug(src, src_path, ext):
328 if ext.endswith('.json') and not isinstance(src, str):
329 src = gws.lib.jsonx.to_pretty_string(src)
330 gws.u.write_file(f'{gws.c.CONFIG_DIR}/{gws.u.to_uid(src_path)}{ext}', src)
333def _as_flat_list(ls):
334 if not isinstance(ls, (list, tuple)):
335 yield ls
336 else:
337 for x in ls:
338 yield from _as_flat_list(x)
341def _to_plain(val):
342 if isinstance(val, (list, tuple)):
343 return [_to_plain(x) for x in val]
344 if isinstance(val, gws.Data):
345 val = vars(val)
346 if isinstance(val, dict):
347 return {k: v if k.startswith('_') else _to_plain(v) for k, v in val.items()}
348 return val