Coverage for gws-app/gws/test/util.py: 86%

273 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 23:09 +0200

1"""Test utilities.""" 

2 

3import contextlib 

4import os 

5import re 

6import typing 

7from typing import Optional 

8 

9import pytest 

10import requests 

11import werkzeug.test 

12 

13import gws 

14import gws.base.auth 

15import gws.base.feature 

16import gws.base.shape 

17import gws.base.web.wsgi_app 

18import gws.config 

19import gws.lib.crs 

20import gws.lib.cli 

21import gws.lib.jsonx 

22import gws.lib.net 

23import gws.lib.sa as sa 

24import gws.lib.vendor.slon 

25import gws.spec.runtime 

26import gws.test.mock 

27 

28## 

29 

30mock = gws.test.mock 

31 

32fixture = pytest.fixture 

33raises = pytest.raises 

34 

35monkey_patch = pytest.MonkeyPatch.context 

36 

37cast = typing.cast 

38 

39exec = gws.lib.cli.exec 

40 

41## 

42 

43OPTIONS = {} 

44 

45 

46def option(name, default=None): 

47 return OPTIONS.get(name, default) 

48 

49 

50def load_options(base_dir): 

51 global OPTIONS 

52 

53 OPTIONS = gws.lib.jsonx.from_path(f'{base_dir}/config/OPTIONS.json') 

54 OPTIONS['BASE_DIR'] = base_dir 

55 

56 ensure_dir(OPTIONS['BASE_DIR'] + '/tmp', clear=True) 

57 

58 if not gws.env.GWS_IN_CONTAINER: 

59 # if we are not in a container, use 'localhost:exposed_port' for all services 

60 for k, v in OPTIONS.items(): 

61 if k.endswith('.host'): 

62 OPTIONS[k] = 'localhost' 

63 if k.endswith('.port'): 

64 OPTIONS[k] = OPTIONS[k.replace('.port', '.expose_port')] 

65 

66 

67## 

68 

69 

70DEFAULT_METADATA = { 

71 'language': 'de', 

72 # 

73 'contactAddress': 'contactAddress 123', 

74 'contactAddressType': 'postal', 

75 'contactArea': 'contactArea1', 

76 'contactCity': 'contactCity1', 

77 'contactCountry': 'contactCountry1', 

78 'contactEmail': 'contact@example.de', 

79 'contactFax': '+49 (0)123-45', 

80 'contactOrganization': 'contactOrganization1', 

81 'contactPerson': 'contactPerson1', 

82 'contactPhone': '+49 (0)123-45', 

83 'contactPosition': 'contactPosition1', 

84 'contactProviderName': 'contactProviderName1', 

85 'contactProviderSite': 'contactProviderSite1', 

86 'contactRole': 'pointOfContact', 

87 'contactUrl': 'https://contact.@example.de', 

88 'contactZip': '12345', 

89 # 

90 'catalogUid': 'catalogUid1', 

91 'temporalBegin': '2021-01-02', 

92 'temporalEnd': '2022-01-02', 

93 'dateCreated': '2023-01-02', 

94 'dateUpdated': '2024-01-02', 

95 # 

96 # 'inspireMandatoryKeyword': 'infoMapAccessService', 

97 'inspireDegreeOfConformity': 'conformant', 

98 'inspireResourceType': 'service', 

99 # 'inspireSpatialDataServiceType': 'view', 

100 'inspireSpatialScope': 'national', 

101 'inspireSpatialScopeName': 'national', 

102 'inspireTheme': 'administrativeUnits', 

103 # 

104 'isoMaintenanceFrequencyCode': 'annual', 

105 'isoQualityConformanceExplanation': 'isoQualityConformanceExplanation1', 

106 'isoQualityConformanceQualityPass': True, 

107 'isoQualityConformanceSpecificationDate': '2020-01-02', 

108 'isoQualityConformanceSpecificationTitle': 'isoQualityConformanceSpecificationTitle1', 

109 'isoQualityLineageSource': 'isoQualityLineageSource1', 

110 'isoQualityLineageSourceScale': 1000, 

111 'isoQualityLineageStatement': 'isoQualityLineageStatement1', 

112 'isoRestrictionCode': 'copyright', 

113 'isoServiceFunction': 'information', 

114 'isoScope': 'dataset', 

115 'isoScopeName': 'isoScopeName1', 

116 'isoSpatialRepresentationType': 'vector', 

117 'isoTopicCategories': ['boundaries', 'farming'], 

118 'isoSpatialResolution': 123, 

119} 

120 

121 

122## 

123 

124 

125def _config_defaults(): 

126 return f""" 

127 database.providers+ {{  

128 uid "GWS_TEST_POSTGRES_PROVIDER"  

129 type "postgres"  

130 host {OPTIONS['service.postgres.host']!r} 

131 port {OPTIONS['service.postgres.port']!r} 

132 username {OPTIONS['service.postgres.user']!r} 

133 password {OPTIONS['service.postgres.password']!r} 

134 database {OPTIONS['service.postgres.database']!r} 

135 schemaCacheLifeTime 0  

136 }} 

137 """ 

138 

139 

140def _to_data(x): 

141 if isinstance(x, gws.Data): 

142 for k, v in vars(x).items(): 

143 setattr(x, k, _to_data(v)) 

144 return x 

145 if isinstance(x, dict): 

146 d = gws.Data() 

147 for k, v in x.items(): 

148 setattr(d, k, _to_data(v)) 

149 return d 

150 if isinstance(x, list): 

151 return [_to_data(y) for y in x] 

152 if isinstance(x, tuple): 

153 return tuple(_to_data(y) for y in x) 

154 return x 

155 

156 

157_GWS_SPEC_RUNTIME = None 

158 

159 

160def gws_specs() -> gws.SpecRuntime: 

161 global _GWS_SPEC_RUNTIME 

162 

163 if _GWS_SPEC_RUNTIME is None: 

164 base = option('BASE_DIR') 

165 _GWS_SPEC_RUNTIME = gws.spec.runtime.create(f'{base}/config/MANIFEST.json', read_cache=False, write_cache=False) 

166 

167 return _GWS_SPEC_RUNTIME 

168 

169 

170def gws_root(cfg: str = '', specs: gws.SpecRuntime = None, activate=True, defaults=True, **vars): 

171 cfg = cfg or '' 

172 if defaults: 

173 cfg = _config_defaults() + '\n' + cfg 

174 

175 cfg = f'server.log.level {gws.log.get_level()}\n' + cfg 

176 

177 for k, v in vars.items(): 

178 cfg = cfg.replace('{' + k + '}', str(v)) 

179 

180 parsed_config = _to_data(gws.lib.vendor.slon.parse(cfg, as_object=True)) 

181 specs = mock.register(specs or gws_specs()) 

182 root = gws.config.initialize(specs, gws.Config(parsed_config)) 

183 

184 if root.configErrors: 

185 for err in root.configErrors: 

186 gws.log.error(f'CONFIGURATION ERROR: {err}') 

187 raise gws.ConfigurationError('config failed') 

188 

189 if not activate: 

190 return root 

191 

192 root = gws.config.activate(root) 

193 return root 

194 

195 

196def gws_system_user(): 

197 return gws.base.auth.user.SystemUser(None, roles=[]) 

198 

199 

200def get_db(root) -> gws.DatabaseProvider: 

201 return root.get('GWS_TEST_POSTGRES_PROVIDER') 

202 

203 

204## 

205 

206# ref: https://werkzeug.palletsprojects.com/en/3.0.x/test/ 

207 

208 

209class TestResponse(werkzeug.test.TestResponse): 

210 cookies: dict[str, werkzeug.test.Cookie] 

211 

212 

213def _wz_request(root, **kwargs): 

214 client = werkzeug.test.Client(gws.base.web.wsgi_app.make_application(root)) 

215 

216 cookies = cast(list[werkzeug.test.Cookie], kwargs.pop('cookies', [])) 

217 for c in cookies: 

218 client.set_cookie( 

219 key=c.key, 

220 value=c.value, 

221 max_age=c.max_age, 

222 expires=c.expires, 

223 path=c.path, 

224 domain=c.domain, 

225 secure=c.secure, 

226 httponly=c.http_only, 

227 ) 

228 

229 res = client.open(**kwargs) 

230 

231 # for some reason, responses do not include cookies, work around this 

232 res.cookies = {c.key: c for c in (client._cookies or {}).values()} 

233 return res 

234 

235 

236class http: 

237 @classmethod 

238 def get(cls, root, url, **kwargs) -> TestResponse: 

239 url = re.sub(r'\s+', '', url.strip()) 

240 url = '/' + url.strip('/') 

241 return _wz_request(root, method='GET', path=url, **kwargs) 

242 

243 @classmethod 

244 def api(cls, root, cmd, request=None, **kwargs) -> TestResponse: 

245 path = gws.c.SERVER_ENDPOINT 

246 if cmd: 

247 path += '/' + cmd 

248 return _wz_request(root, method='POST', path=path, json=request or {}, **kwargs) 

249 

250 

251## 

252 

253 

254class pg: 

255 saEngine: Optional[sa.Engine] = None 

256 

257 @classmethod 

258 def connect(cls): 

259 # kwargs.setdefault('pool_pre_ping', True) 

260 # kwargs.setdefault('echo', self.root.app.developer_option('db.engine_echo')) 

261 

262 if not cls.saEngine: 

263 cls.saEngine = sa.create_engine(cls.url(), poolclass=sa.NullPool) 

264 

265 return cls.saEngine.connect() 

266 

267 @classmethod 

268 def url(cls): 

269 return gws.lib.net.make_url( 

270 scheme='postgresql', 

271 username=OPTIONS['service.postgres.user'], 

272 password=OPTIONS['service.postgres.password'], 

273 hostname=OPTIONS['service.postgres.host'], 

274 port=OPTIONS['service.postgres.port'], 

275 path=OPTIONS['service.postgres.database'], 

276 ) 

277 

278 @classmethod 

279 def create_schema(cls, name: str): 

280 with cls.connect() as conn: 

281 conn.execute(sa.text(f'DROP SCHEMA IF EXISTS {name} CASCADE')) 

282 conn.execute(sa.text(f'CREATE SCHEMA {name}')) 

283 conn.commit() 

284 

285 @classmethod 

286 def create(cls, table_name: str, col_defs: dict): 

287 with cls.connect() as conn: 

288 conn.execute(sa.text(f'DROP TABLE IF EXISTS {table_name} CASCADE')) 

289 ddl = _comma(f'{k} {v}' for k, v in col_defs.items()) 

290 conn.execute(sa.text(f'CREATE TABLE {table_name} ( {ddl} )')) 

291 conn.commit() 

292 

293 @classmethod 

294 def clear(cls, table_name: str): 

295 with cls.connect() as conn: 

296 conn.execute(sa.text(f'TRUNCATE TABLE {table_name}')) 

297 conn.commit() 

298 

299 @classmethod 

300 def insert(cls, table_name: str, row_dicts: list[dict]): 

301 with cls.connect() as conn: 

302 conn.execute(sa.text(f'TRUNCATE TABLE {table_name}')) 

303 if row_dicts: 

304 names = _comma(k for k in row_dicts[0]) 

305 values = _comma(':' + k for k in row_dicts[0]) 

306 ins = sa.text(f'INSERT INTO {table_name} ( {names} ) VALUES( {values} )') 

307 conn.execute(ins, row_dicts) 

308 conn.commit() 

309 

310 @classmethod 

311 def rows(cls, sql: str) -> list[tuple]: 

312 with cls.connect() as conn: 

313 return [tuple(r) for r in conn.execute(sa.text(sql))] 

314 

315 @classmethod 

316 def content(cls, sql_or_table_name: str) -> list[tuple]: 

317 if not sql_or_table_name.lower().startswith('select'): 

318 sql_or_table_name = f'SELECT * FROM {sql_or_table_name}' 

319 return cls.rows(sql_or_table_name) 

320 

321 @classmethod 

322 def exec(cls, sql: str, **kwargs): 

323 with cls.connect() as conn: 

324 for s in sql.split(';'): 

325 if s.strip(): 

326 conn.execute(sa.text(s.strip()), kwargs) 

327 conn.commit() 

328 

329 @classmethod 

330 def connections(cls): 

331 with cls.connect() as conn: 

332 mark = gws.u.random_string(16) 

333 sql = f""" 

334 SELECT  

335 *, 

336 '{mark}' AS _mark 

337 FROM  

338 pg_stat_activity 

339 WHERE  

340 backend_type = 'client backend' 

341 """ 

342 

343 rs = [] 

344 for r in conn.execute(sa.text(sql)): 

345 r = r._asdict() 

346 r.pop('_mark', None) 

347 q = r.get('query', '') 

348 if mark in q: 

349 continue 

350 rs.append(r) 

351 return rs 

352 

353 

354## 

355 

356 

357class log: 

358 _buf = [] 

359 

360 @classmethod 

361 def write(cls, s): 

362 cls._buf.append(s) 

363 

364 @classmethod 

365 def reset(cls): 

366 cls._buf = [] 

367 

368 @classmethod 

369 def get(cls): 

370 r = cls._buf 

371 cls._buf = [] 

372 return r 

373 

374 

375## 

376 

377 

378def feature_from_dict(model, atts) -> gws.Feature: 

379 f = gws.base.feature.new(model=model, record=gws.FeatureRecord(attributes=atts)) 

380 f.attributes = atts 

381 return f 

382 

383 

384def feature(model, **atts) -> gws.Feature: 

385 return feature_from_dict(model, atts) 

386 

387 

388def ewkb(wkt: str, srid=3857): 

389 shape = gws.base.shape.from_wkt(wkt, default_crs=gws.lib.crs.get(srid)) 

390 return shape.to_ewkb() 

391 

392 

393def model_context(**kwargs): 

394 kwargs.setdefault('op', gws.ModelOperation.read) 

395 kwargs.setdefault('user', gws_system_user()) 

396 return gws.ModelContext(**kwargs) 

397 

398 

399## 

400 

401 

402class mockserver: 

403 @classmethod 

404 def add(cls, text): 

405 cls.post('__add', data=text) 

406 

407 @classmethod 

408 def set(cls, text): 

409 cls.post('__set', data=text) 

410 

411 @classmethod 

412 def reset(cls): 

413 cls.post('__del') 

414 

415 @classmethod 

416 def post(cls, verb, data=''): 

417 requests.post(cls.url(verb), data=data) 

418 

419 @classmethod 

420 def url(cls, path=''): 

421 h = OPTIONS.get('service.mockserver.host') 

422 p = OPTIONS.get('service.mockserver.port') 

423 u = f'http://{h}:{p}' 

424 if path: 

425 u += '/' + path 

426 return u 

427 

428 

429## 

430 

431 

432def fxml(s): 

433 """Format XML for easier comparison in tests.""" 

434 s = re.sub(r'\s+', ' ', s.strip()) 

435 s = re.sub(r'\s*(<|>)\s*', r'\1', s) 

436 s = s.replace('<', '\n<').replace('>', '>\n').replace('\n\n', '\n') 

437 return s.strip() 

438 

439 

440## 

441 

442 

443def unlink(path): 

444 if os.path.isfile(path): 

445 os.unlink(path) 

446 return 

447 if os.path.isdir(path): 

448 for de in os.scandir(path): 

449 unlink(de.path) 

450 os.rmdir(path) 

451 

452 

453def ensure_dir(path, clear=False): 

454 def _clear(d): 

455 for de in os.scandir(d): 

456 if de.is_dir(): 

457 _clear(de.path) 

458 os.rmdir(de.path) 

459 else: 

460 os.unlink(de.path) 

461 

462 os.makedirs(path, exist_ok=True) 

463 if clear: 

464 _clear(path) 

465 

466 

467def path_in_base_dir(path): 

468 base = option('BASE_DIR') 

469 return f'{base}/{path}' 

470 

471 

472@contextlib.contextmanager 

473def temp_file_in_base_dir(content='', keep=False): 

474 # for exec/chmod tests, which cannot use tmp_path 

475 

476 base = option('BASE_DIR') 

477 d = f'{base}/tmp' 

478 ensure_dir(d) 

479 

480 p = d + '/' + gws.u.random_string(16) 

481 gws.u.write_file(p, content) 

482 yield p 

483 

484 if not keep: 

485 unlink(p) 

486 

487 

488@contextlib.contextmanager 

489def temp_dir_in_base_dir(keep=False): 

490 base = option('BASE_DIR') 

491 

492 d = f'{base}/tmp/' + gws.u.random_string(16) 

493 ensure_dir(d, clear=True) 

494 yield d 

495 

496 if not keep: 

497 unlink(d) 

498 

499 

500def dump(s): 

501 print('----') 

502 print(s) 

503 print('-----') 

504 

505 

506_comma = ','.join