Coverage for gws-app / gws / test / util.py: 86%

278 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1"""Test utilities.""" 

2 

3import contextlib 

4import math 

5import os 

6import re 

7import typing 

8from typing import Optional 

9 

10import pytest 

11import requests 

12import werkzeug.test 

13 

14import gws 

15import gws.base.auth 

16import gws.base.feature 

17import gws.base.shape 

18import gws.base.web.wsgi_app 

19import gws.config 

20import gws.lib.crs 

21import gws.lib.cli 

22import gws.lib.jsonx 

23import gws.lib.net 

24import gws.lib.sa as sa 

25import gws.lib.vendor.slon 

26import gws.spec.runtime 

27import gws.test.mock 

28 

29## 

30 

31mock = gws.test.mock 

32 

33fixture = pytest.fixture 

34raises = pytest.raises 

35 

36monkey_patch = pytest.MonkeyPatch.context 

37 

38cast = typing.cast 

39 

40exec = gws.lib.cli.exec 

41 

42## 

43 

44OPTIONS = {} 

45 

46 

47def option(name, default=None): 

48 return OPTIONS.get(name, default) 

49 

50 

51def load_options(base_dir): 

52 global OPTIONS 

53 

54 OPTIONS = gws.lib.jsonx.from_path(f'{base_dir}/config/OPTIONS.json') 

55 OPTIONS['BASE_DIR'] = base_dir 

56 

57 ensure_dir(OPTIONS['BASE_DIR'] + '/tmp', clear=True) 

58 

59 if not gws.env.GWS_IN_CONTAINER: 

60 # if we are not in a container, use 'localhost:exposed_port' for all services 

61 for k, v in OPTIONS.items(): 

62 if k.endswith('.host'): 

63 OPTIONS[k] = 'localhost' 

64 if k.endswith('.port'): 

65 OPTIONS[k] = OPTIONS[k.replace('.port', '.expose_port')] 

66 

67 

68## 

69 

70 

71DEFAULT_METADATA = { 

72 'language': 'de', 

73 # 

74 'contactAddress': 'contactAddress 123', 

75 'contactAddressType': 'postal', 

76 'contactArea': 'contactArea1', 

77 'contactCity': 'contactCity1', 

78 'contactCountry': 'contactCountry1', 

79 'contactEmail': 'contact@example.de', 

80 'contactFax': '+49 (0)123-45', 

81 'contactOrganization': 'contactOrganization1', 

82 'contactPerson': 'contactPerson1', 

83 'contactPhone': '+49 (0)123-45', 

84 'contactPosition': 'contactPosition1', 

85 'contactProviderName': 'contactProviderName1', 

86 'contactProviderSite': 'contactProviderSite1', 

87 'contactRole': 'pointOfContact', 

88 'contactUrl': 'https://contact.@example.de', 

89 'contactZip': '12345', 

90 # 

91 'catalogUid': 'catalogUid1', 

92 'temporalBegin': '2021-01-02', 

93 'temporalEnd': '2022-01-02', 

94 'dateCreated': '2023-01-02', 

95 'dateUpdated': '2024-01-02', 

96 # 

97 # 'inspireMandatoryKeyword': 'infoMapAccessService', 

98 'inspireDegreeOfConformity': 'conformant', 

99 'inspireResourceType': 'service', 

100 # 'inspireSpatialDataServiceType': 'view', 

101 'inspireSpatialScope': 'national', 

102 'inspireSpatialScopeName': 'national', 

103 'inspireTheme': 'administrativeUnits', 

104 # 

105 'isoMaintenanceFrequencyCode': 'annual', 

106 'isoQualityConformanceExplanation': 'isoQualityConformanceExplanation1', 

107 'isoQualityConformanceQualityPass': True, 

108 'isoQualityConformanceSpecificationDate': '2020-01-02', 

109 'isoQualityConformanceSpecificationTitle': 'isoQualityConformanceSpecificationTitle1', 

110 'isoQualityLineageSource': 'isoQualityLineageSource1', 

111 'isoQualityLineageSourceScale': 1000, 

112 'isoQualityLineageStatement': 'isoQualityLineageStatement1', 

113 'isoRestrictionCode': 'copyright', 

114 'isoServiceFunction': 'information', 

115 'isoScope': 'dataset', 

116 'isoScopeName': 'isoScopeName1', 

117 'isoSpatialRepresentationType': 'vector', 

118 'isoTopicCategories': ['boundaries', 'farming'], 

119 'isoSpatialResolution': 123, 

120} 

121 

122 

123## 

124 

125 

126def _config_defaults(): 

127 return f""" 

128 database.providers+ {{  

129 uid "GWS_TEST_POSTGRES_PROVIDER"  

130 type "postgres"  

131 host {OPTIONS['service.postgres.host']!r} 

132 port {OPTIONS['service.postgres.port']!r} 

133 username {OPTIONS['service.postgres.user']!r} 

134 password {OPTIONS['service.postgres.password']!r} 

135 database {OPTIONS['service.postgres.database']!r} 

136 schemaCacheLifeTime 0  

137 }} 

138 """ 

139 

140 

141def _to_data(x): 

142 if isinstance(x, gws.Data): 

143 for k, v in vars(x).items(): 

144 setattr(x, k, _to_data(v)) 

145 return x 

146 if isinstance(x, dict): 

147 d = gws.Data() 

148 for k, v in x.items(): 

149 setattr(d, k, _to_data(v)) 

150 return d 

151 if isinstance(x, list): 

152 return [_to_data(y) for y in x] 

153 if isinstance(x, tuple): 

154 return tuple(_to_data(y) for y in x) 

155 return x 

156 

157 

158_GWS_SPEC_RUNTIME = None 

159 

160 

161def gws_specs() -> gws.SpecRuntime: 

162 global _GWS_SPEC_RUNTIME 

163 

164 if _GWS_SPEC_RUNTIME is None: 

165 base = option('BASE_DIR') 

166 _GWS_SPEC_RUNTIME = gws.spec.runtime.create(f'{base}/config/MANIFEST.json', read_cache=False, write_cache=False) 

167 

168 return _GWS_SPEC_RUNTIME 

169 

170 

171def gws_root(cfg: str = '', specs: gws.SpecRuntime = None, activate=True, defaults=True, **vars): 

172 cfg = cfg or '' 

173 if defaults: 

174 cfg = _config_defaults() + '\n' + cfg 

175 

176 cfg = f'server.log.level {gws.log.get_level()}\n' + cfg 

177 

178 for k, v in vars.items(): 

179 cfg = cfg.replace('{' + k + '}', str(v)) 

180 

181 parsed_config = _to_data(gws.lib.vendor.slon.parse(cfg, as_object=True)) 

182 specs = mock.register(specs or gws_specs()) 

183 root = gws.config.initialize(specs, gws.Config(parsed_config)) 

184 

185 if root.configErrors: 

186 for err in root.configErrors: 

187 gws.log.error(f'CONFIGURATION ERROR: {err}') 

188 raise gws.ConfigurationError('config failed') 

189 

190 if not activate: 

191 return root 

192 

193 root = gws.config.activate(root) 

194 return root 

195 

196 

197def gws_system_user(): 

198 return gws.base.auth.user.SystemUser(None, roles=[]) 

199 

200 

201def get_db(root) -> gws.DatabaseProvider: 

202 return root.get('GWS_TEST_POSTGRES_PROVIDER') 

203 

204 

205## 

206 

207# ref: https://werkzeug.palletsprojects.com/en/3.0.x/test/ 

208 

209 

210class TestResponse(werkzeug.test.TestResponse): 

211 cookies: dict[str, werkzeug.test.Cookie] 

212 

213 

214def _wz_request(root, **kwargs): 

215 client = werkzeug.test.Client(gws.base.web.wsgi_app.make_application(root)) 

216 

217 cookies = cast(list[werkzeug.test.Cookie], kwargs.pop('cookies', [])) 

218 for c in cookies: 

219 client.set_cookie( 

220 key=c.key, 

221 value=c.value, 

222 max_age=c.max_age, 

223 expires=c.expires, 

224 path=c.path, 

225 domain=c.domain, 

226 secure=c.secure, 

227 httponly=c.http_only, 

228 ) 

229 

230 res = client.open(**kwargs) 

231 

232 # for some reason, responses do not include cookies, work around this 

233 res.cookies = {c.key: c for c in (client._cookies or {}).values()} 

234 return res 

235 

236 

237class http: 

238 @classmethod 

239 def get(cls, root, url, **kwargs) -> TestResponse: 

240 url = re.sub(r'\s+', '', url.strip()) 

241 url = '/' + url.strip('/') 

242 return _wz_request(root, method='GET', path=url, **kwargs) 

243 

244 @classmethod 

245 def api(cls, root, cmd, request=None, **kwargs) -> TestResponse: 

246 path = gws.c.SERVER_ENDPOINT 

247 if cmd: 

248 path += '/' + cmd 

249 return _wz_request(root, method='POST', path=path, json=request or {}, **kwargs) 

250 

251 

252## 

253 

254 

255class pg: 

256 saEngine: Optional[sa.Engine] = None 

257 

258 @classmethod 

259 def connect(cls): 

260 # kwargs.setdefault('pool_pre_ping', True) 

261 # kwargs.setdefault('echo', self.root.app.developer_option('db.engine_echo')) 

262 

263 if not cls.saEngine: 

264 cls.saEngine = sa.create_engine(cls.url(), poolclass=sa.NullPool) 

265 

266 return cls.saEngine.connect() 

267 

268 @classmethod 

269 def url(cls): 

270 return gws.lib.net.make_url( 

271 scheme='postgresql', 

272 username=OPTIONS['service.postgres.user'], 

273 password=OPTIONS['service.postgres.password'], 

274 hostname=OPTIONS['service.postgres.host'], 

275 port=OPTIONS['service.postgres.port'], 

276 path=OPTIONS['service.postgres.database'], 

277 ) 

278 

279 @classmethod 

280 def create_schema(cls, name: str): 

281 with cls.connect() as conn: 

282 conn.execute(sa.text(f'DROP SCHEMA IF EXISTS {name} CASCADE')) 

283 conn.execute(sa.text(f'CREATE SCHEMA {name}')) 

284 conn.commit() 

285 

286 @classmethod 

287 def create(cls, table_name: str, col_defs: dict): 

288 with cls.connect() as conn: 

289 conn.execute(sa.text(f'DROP TABLE IF EXISTS {table_name} CASCADE')) 

290 ddl = _comma(f'{k} {v}' for k, v in col_defs.items()) 

291 conn.execute(sa.text(f'CREATE TABLE {table_name} ( {ddl} )')) 

292 conn.commit() 

293 

294 @classmethod 

295 def clear(cls, table_name: str): 

296 with cls.connect() as conn: 

297 conn.execute(sa.text(f'TRUNCATE TABLE {table_name}')) 

298 conn.commit() 

299 

300 @classmethod 

301 def insert(cls, table_name: str, row_dicts: list[dict]): 

302 with cls.connect() as conn: 

303 conn.execute(sa.text(f'TRUNCATE TABLE {table_name}')) 

304 if row_dicts: 

305 names = _comma(k for k in row_dicts[0]) 

306 values = _comma(':' + k for k in row_dicts[0]) 

307 ins = sa.text(f'INSERT INTO {table_name} ( {names} ) VALUES( {values} )') 

308 conn.execute(ins, row_dicts) 

309 conn.commit() 

310 

311 @classmethod 

312 def rows(cls, sql: str) -> list[tuple]: 

313 with cls.connect() as conn: 

314 return [tuple(r) for r in conn.execute(sa.text(sql))] 

315 

316 @classmethod 

317 def content(cls, sql_or_table_name: str) -> list[tuple]: 

318 if not sql_or_table_name.lower().startswith('select'): 

319 sql_or_table_name = f'SELECT * FROM {sql_or_table_name}' 

320 return cls.rows(sql_or_table_name) 

321 

322 @classmethod 

323 def exec(cls, sql: str, **kwargs): 

324 with cls.connect() as conn: 

325 for s in sql.split(';'): 

326 if s.strip(): 

327 conn.execute(sa.text(s.strip()), kwargs) 

328 conn.commit() 

329 

330 @classmethod 

331 def connections(cls): 

332 with cls.connect() as conn: 

333 mark = gws.u.random_string(16) 

334 sql = f""" 

335 SELECT  

336 *, 

337 '{mark}' AS _mark 

338 FROM  

339 pg_stat_activity 

340 WHERE  

341 backend_type = 'client backend' 

342 """ 

343 

344 rs = [] 

345 for r in conn.execute(sa.text(sql)): 

346 r = r._asdict() 

347 r.pop('_mark', None) 

348 q = r.get('query', '') 

349 if mark in q: 

350 continue 

351 rs.append(r) 

352 return rs 

353 

354 

355## 

356 

357 

358class log: 

359 _buf = [] 

360 

361 @classmethod 

362 def write(cls, s): 

363 cls._buf.append(s) 

364 

365 @classmethod 

366 def reset(cls): 

367 cls._buf = [] 

368 

369 @classmethod 

370 def get(cls): 

371 r = cls._buf 

372 cls._buf = [] 

373 return r 

374 

375 

376## 

377 

378 

379def feature_from_dict(model, atts) -> gws.Feature: 

380 f = gws.base.feature.new(model=model, record=gws.FeatureRecord(attributes=atts)) 

381 f.attributes = atts 

382 return f 

383 

384 

385def feature(model, **atts) -> gws.Feature: 

386 return feature_from_dict(model, atts) 

387 

388 

389def ewkb(wkt: str, srid=3857): 

390 shape = gws.base.shape.from_wkt(wkt, default_crs=gws.lib.crs.get(srid)) 

391 return shape.to_ewkb() 

392 

393 

394def model_context(**kwargs): 

395 kwargs.setdefault('op', gws.ModelOperation.read) 

396 kwargs.setdefault('user', gws_system_user()) 

397 return gws.ModelContext(**kwargs) 

398 

399 

400## 

401 

402 

403class mockserver: 

404 @classmethod 

405 def add(cls, text): 

406 cls.post('__add', data=text) 

407 

408 @classmethod 

409 def set(cls, text): 

410 cls.post('__set', data=text) 

411 

412 @classmethod 

413 def reset(cls): 

414 cls.post('__del') 

415 

416 @classmethod 

417 def post(cls, verb, data=''): 

418 requests.post(cls.url(verb), data=data) 

419 

420 @classmethod 

421 def url(cls, path=''): 

422 h = OPTIONS.get('service.mockserver.host') 

423 p = OPTIONS.get('service.mockserver.port') 

424 u = f'http://{h}:{p}' 

425 if path: 

426 u += '/' + path 

427 return u 

428 

429 

430## 

431 

432 

433def fxml(s): 

434 """Format XML for easier comparison in tests.""" 

435 s = re.sub(r'\s+', ' ', s.strip()) 

436 s = re.sub(r'\s*(<|>)\s*', r'\1', s) 

437 s = s.replace('<', '\n<').replace('>', '>\n').replace('\n\n', '\n') 

438 return s.strip() 

439 

440 

441def is_close(a, b, rel_tol=1e-9, abs_tol=0.0): 

442 if isinstance(a, (int, float)): 

443 return math.isclose(a, b, rel_tol=rel_tol, abs_tol=abs_tol) 

444 return len(a) == len(b) and all(math.isclose(x, y, rel_tol=rel_tol, abs_tol=abs_tol) for x, y in zip(a, b)) 

445 

446 

447## 

448 

449 

450def unlink(path): 

451 if os.path.isfile(path): 

452 os.unlink(path) 

453 return 

454 if os.path.isdir(path): 

455 for de in os.scandir(path): 

456 unlink(de.path) 

457 os.rmdir(path) 

458 

459 

460def ensure_dir(path, clear=False): 

461 def _clear(d): 

462 for de in os.scandir(d): 

463 if de.is_dir(): 

464 _clear(de.path) 

465 os.rmdir(de.path) 

466 else: 

467 os.unlink(de.path) 

468 

469 os.makedirs(path, exist_ok=True) 

470 if clear: 

471 _clear(path) 

472 

473 

474def path_in_base_dir(path): 

475 base = option('BASE_DIR') 

476 return f'{base}/{path}' 

477 

478 

479@contextlib.contextmanager 

480def temp_file_in_base_dir(content='', keep=False): 

481 # for exec/chmod tests, which cannot use tmp_path 

482 

483 base = option('BASE_DIR') 

484 d = f'{base}/tmp' 

485 ensure_dir(d) 

486 

487 p = d + '/' + gws.u.random_string(16) 

488 gws.u.write_file(p, content) 

489 yield p 

490 

491 if not keep: 

492 unlink(p) 

493 

494 

495@contextlib.contextmanager 

496def temp_dir_in_base_dir(keep=False): 

497 base = option('BASE_DIR') 

498 

499 d = f'{base}/tmp/' + gws.u.random_string(16) 

500 ensure_dir(d, clear=True) 

501 yield d 

502 

503 if not keep: 

504 unlink(d) 

505 

506 

507def dump(s): 

508 print('----') 

509 print(s) 

510 print('-----') 

511 

512 

513_comma = ','.join