Coverage for gws-app/gws/config/parser.py: 17%

218 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1"""Configuration parser. 

2 

3Convert configuration files (in different formats) or row config dicts 

4into ``gws.Config`` objects by validating them against the specs. 

5""" 

6 

7from typing import Optional, cast 

8 

9import os 

10import yaml 

11 

12import gws 

13import gws.lib.jsonx 

14import gws.lib.osx 

15import gws.lib.datetimex 

16import gws.lib.importer 

17import gws.lib.vendor.jump 

18import gws.lib.vendor.slon 

19import gws.spec.runtime 

20 

21CONFIG_PATH_PATTERN = r'\.(py|json|yaml|cx)$' 

22 

23 

24def parse_from_path(path: str, as_type: str, ctx: gws.ConfigContext) -> Optional[gws.Config]: 

25 """Parse a configuration from a path. 

26 

27 Args: 

28 path: Path to the configuration file. 

29 as_type: Type of the configuration (e.g., 'gws.base.application.core.Config'). 

30 ctx: Configuration context. 

31 """ 

32 

33 pp = _Parser(ctx) 

34 val = pp.read_from_path(path) 

35 d = pp.ensure_dict(val, path) 

36 return pp.parse_dict(d, path, as_type) if d else None 

37 

38 

39def parse_dict(dct: dict | gws.Data, path: str, as_type: str, ctx: gws.ConfigContext) -> Optional[gws.Config]: 

40 """Parse a configuration given as python dict. 

41 

42 Args: 

43 dct: Dictionary containing the configuration. 

44 path: Path to the configuration file (for error reporting). 

45 as_type: Type of the configuration. 

46 ctx: Configuration context. 

47 """ 

48 

49 pp = _Parser(ctx) 

50 d = pp.ensure_dict(dct, path) 

51 return pp.parse_dict(d, path, as_type) if d else None 

52 

53 

54def parse_app_from_path(path: str, ctx: gws.ConfigContext) -> Optional[gws.Config]: 

55 """Parse application configuration from a path. 

56 

57 Args: 

58 path: Path to the application configuration file. 

59 ctx: Configuration context. 

60 """ 

61 

62 pp = _Parser(ctx) 

63 val = pp.read_from_path(path) 

64 d = pp.ensure_dict(val, path) 

65 return _parse_app_dict(d, path, pp) if d else None 

66 

67 

68def parse_app_dict(dct: dict | gws.Data, path: str, ctx: gws.ConfigContext) -> Optional[gws.Config]: 

69 """Parse application configuration given as python dict. 

70 

71 Args: 

72 dct: Dictionary containing the application configuration. 

73 path: Path to the configuration file (for error reporting). 

74 ctx: Configuration context. 

75 """ 

76 

77 pp = _Parser(ctx) 

78 d = pp.ensure_dict(dct, path) 

79 return _parse_app_dict(d, path, pp) if d else None 

80 

81 

82def read_from_path(path: str, ctx: gws.ConfigContext) -> Optional[dict]: 

83 """Read a configuration file from a path, parse config formats. 

84 

85 Args: 

86 path: Path to the configuration file. 

87 ctx: Configuration context. 

88 """ 

89 pp = _Parser(ctx) 

90 val = pp.read_from_path(path) 

91 d = pp.ensure_dict(val, path) 

92 return d 

93 

94 

95## 

96 

97 

98def _parse_app_dict(dct: dict, path, pp: '_Parser'): 

99 dct = gws.u.to_dict(dct) 

100 if not isinstance(dct, dict): 

101 _register_error(pp.ctx, f'app config must be a dict: {path!r}') 

102 return 

103 

104 # the timezone must be set before everything else 

105 tz = dct.get('server', {}).get('timeZone', '') 

106 if tz: 

107 gws.lib.datetimex.set_local_time_zone(tz) 

108 gws.log.info(f'local time zone="{gws.lib.datetimex.time_zone()}"') 

109 

110 # remove 'projects' from the config, parse them later on 

111 inline_projects = dct.pop('projects', []) 

112 

113 gws.log.info('parsing main configuration...') 

114 app_cfg = pp.parse_dict(dct, path, as_type='gws.base.application.core.Config') 

115 if not app_cfg: 

116 return 

117 

118 projects = [] 

119 for dcts in inline_projects: 

120 projects.extend(_parse_projects(dcts, path, pp)) 

121 

122 project_paths = list(app_cfg.get('projectPaths') or []) 

123 project_dirs = list(app_cfg.get('projectDirs') or []) 

124 

125 all_project_paths = list(project_paths) 

126 for dirname in project_dirs: 

127 all_project_paths.extend(gws.lib.osx.find_files(dirname, CONFIG_PATH_PATTERN, deep=True)) 

128 

129 for pth in sorted(set(all_project_paths)): 

130 projects.extend(_parse_projects_from_path(pth, pp)) 

131 

132 app_cfg.set('projectPaths', project_paths) 

133 app_cfg.set('projectDirs', project_dirs) 

134 app_cfg.set('projects', projects) 

135 

136 _save_debug(app_cfg, path, '.parsed.json') 

137 return app_cfg 

138 

139 

140def _parse_projects_from_path(path, pp: '_Parser'): 

141 cfg_list = pp.read_from_path(path) 

142 if not cfg_list: 

143 return [] 

144 return _parse_projects(cfg_list, path, pp) 

145 

146 

147def _parse_projects(cfg_list, path, pp: '_Parser'): 

148 ps = [] 

149 

150 for c in _as_flat_list(cfg_list): 

151 d = pp.ensure_dict(c, path) 

152 if not d: 

153 continue 

154 prj_cfg = pp.parse_dict(d, path, 'gws.ext.config.project') 

155 if prj_cfg: 

156 ps.append(prj_cfg) 

157 

158 return ps 

159 

160 

161## 

162 

163 

164class _Parser: 

165 def __init__(self, ctx: gws.ConfigContext): 

166 self.ctx = ctx 

167 self.ctx.errors = ctx.errors or [] 

168 self.ctx.paths = ctx.paths or set() 

169 self.ctx.readOptions = ctx.readOptions or set() 

170 self.ctx.readOptions.add(gws.SpecReadOption.verboseErrors) 

171 

172 def ensure_dict(self, val, path): 

173 if val is None: 

174 return 

175 d = _to_plain(val) 

176 if not isinstance(d, dict): 

177 _register_error(self.ctx, f'unsupported configuration type: {type(val)!r}', path) 

178 return 

179 return d 

180 

181 def parse_dict(self, dct: dict, path: str, as_type: str) -> Optional[gws.Config]: 

182 if not isinstance(dct, dict): 

183 _register_error(self.ctx, 'unsupported configuration', path) 

184 return 

185 if path: 

186 _register_path(self.ctx, path) 

187 try: 

188 cfg = self.ctx.specs.read( 

189 dct, 

190 as_type, 

191 path=path, 

192 options=self.ctx.readOptions, 

193 ) 

194 return cast(gws.Config, cfg) 

195 except gws.spec.runtime.ReadError as exc: 

196 message, _, details = exc.args 

197 lines = [] 

198 pp = details.get('path') 

199 if pp: 

200 lines.append(f'PATH: {pp!r}') 

201 pp = details.get('formatted_value') 

202 if pp: 

203 lines.append(f'VALUE: {pp}') 

204 pp = details.get('formatted_stack') 

205 if pp: 

206 lines.extend(pp) 

207 _register_error(self.ctx, f'parse error: {message}', *lines, cause=exc) 

208 

209 def read_from_path(self, path: str): 

210 if not os.path.isfile(path): 

211 _register_error(self.ctx, f'file not found: {path!r}') 

212 return 

213 

214 _register_path(self.ctx, path) 

215 r = self.read2(path) 

216 

217 if r: 

218 r = _to_plain(r) 

219 _save_debug(r, path, '.src.json') 

220 return r 

221 

222 def read2(self, path: str): 

223 if path.endswith('.py'): 

224 return self.read_py(path) 

225 if path.endswith('.json'): 

226 return self.read_json(path) 

227 if path.endswith('.yml'): 

228 return self.read_yaml(path) 

229 if path.endswith('.cx'): 

230 return self.read_cx(path) 

231 

232 _register_error(self.ctx, 'unsupported configuration', path) 

233 

234 def read_py(self, path: str): 

235 try: 

236 fn = gws.lib.importer.load_file(path).get('main') 

237 if not fn: 

238 _register_error(self.ctx, f'no "main" function found in {path!r}') 

239 return 

240 return fn(self.ctx) 

241 except Exception as exc: 

242 gws.log.exception() 

243 _register_error(self.ctx, f'python error: {exc}', cause=exc) 

244 

245 def read_json(self, path: str): 

246 try: 

247 return gws.lib.jsonx.from_path(path) 

248 except Exception as exc: 

249 _register_error(self.ctx, 'json error', cause=exc) 

250 

251 def read_yaml(self, path: str): 

252 try: 

253 with open(path, encoding='utf8') as fp: 

254 return yaml.safe_load(fp) 

255 except Exception as exc: 

256 _register_error(self.ctx, 'yaml error', cause=exc) 

257 

258 def read_cx(self, path: str): 

259 err_cnt = [0] 

260 

261 def _error_handler(exc, path, line, env): 

262 _register_syntax_error(self.ctx, path, gws.u.read_file(path), repr(exc), line) 

263 err_cnt[0] += 1 

264 return True 

265 

266 def _loader(cur_path, load_path): 

267 if not os.path.isabs(load_path): 

268 load_path = os.path.abspath(os.path.dirname(cur_path) + '/' + load_path) 

269 _register_path(self.ctx, load_path) 

270 return gws.u.read_file(load_path), load_path 

271 

272 try: 

273 tpl = gws.lib.vendor.jump.compile_path(path, loader=_loader) 

274 except gws.lib.vendor.jump.CompileError as exc: 

275 _register_syntax_error(self.ctx, path, gws.u.read_file(exc.path), exc.message, exc.line, cause=exc) 

276 return 

277 

278 args = args = { 

279 'true': True, 

280 'false': False, 

281 'ctx': self.ctx, 

282 'gws': gws, 

283 } 

284 

285 slon = gws.lib.vendor.jump.call(tpl, args, error=_error_handler) 

286 if err_cnt[0] > 0: 

287 return 

288 

289 _save_debug(slon, path, '.src.slon') 

290 

291 try: 

292 return gws.lib.vendor.slon.loads(slon, as_object=True) 

293 except gws.lib.vendor.slon.SlonError as exc: 

294 _register_syntax_error(self.ctx, path, slon, exc.args[0], exc.args[2], cause=exc) 

295 

296 

297## 

298 

299 

300def _register_path(ctx, path): 

301 ctx.paths.add(path) 

302 

303 

304def _register_error(ctx, message, *args, cause=None): 

305 err = gws.ConfigurationError(message, *args) 

306 if cause: 

307 err.__cause__ = cause 

308 ctx.errors.append(err) 

309 

310 

311def _register_syntax_error(ctx, path, src, message, line, context=10, cause=None): 

312 lines = [f'PATH: {path!r}'] 

313 

314 for n, ln in enumerate(src.splitlines(), 1): 

315 if n < line - context: 

316 continue 

317 if n > line + context: 

318 break 

319 ln = f'{n}: {ln}' 

320 if n == line: 

321 ln = f'>>> {ln}' 

322 lines.append(ln) 

323 

324 _register_error(ctx, f'syntax error: {message}', *lines, cause=cause) 

325 

326 

327def _save_debug(src, src_path, ext): 

328 if ext.endswith('.json') and not isinstance(src, str): 

329 src = gws.lib.jsonx.to_pretty_string(src) 

330 gws.u.write_file(f'{gws.c.CONFIG_DIR}/{gws.u.to_uid(src_path)}{ext}', src) 

331 

332 

333def _as_flat_list(ls): 

334 if not isinstance(ls, (list, tuple)): 

335 yield ls 

336 else: 

337 for x in ls: 

338 yield from _as_flat_list(x) 

339 

340 

341def _to_plain(val): 

342 if isinstance(val, (list, tuple)): 

343 return [_to_plain(x) for x in val] 

344 if isinstance(val, gws.Data): 

345 val = vars(val) 

346 if isinstance(val, dict): 

347 return {k: v if k.startswith('_') else _to_plain(v) for k, v in val.items()} 

348 return val