Coverage for gws-app / gws / test / util.py: 86%
278 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1"""Test utilities."""
3import contextlib
4import math
5import os
6import re
7import typing
8from typing import Optional
10import pytest
11import requests
12import werkzeug.test
14import gws
15import gws.base.auth
16import gws.base.feature
17import gws.base.shape
18import gws.base.web.wsgi_app
19import gws.config
20import gws.lib.crs
21import gws.lib.cli
22import gws.lib.jsonx
23import gws.lib.net
24import gws.lib.sa as sa
25import gws.lib.vendor.slon
26import gws.spec.runtime
27import gws.test.mock
29##
31mock = gws.test.mock
33fixture = pytest.fixture
34raises = pytest.raises
36monkey_patch = pytest.MonkeyPatch.context
38cast = typing.cast
40exec = gws.lib.cli.exec
42##
44OPTIONS = {}
47def option(name, default=None):
48 return OPTIONS.get(name, default)
51def load_options(base_dir):
52 global OPTIONS
54 OPTIONS = gws.lib.jsonx.from_path(f'{base_dir}/config/OPTIONS.json')
55 OPTIONS['BASE_DIR'] = base_dir
57 ensure_dir(OPTIONS['BASE_DIR'] + '/tmp', clear=True)
59 if not gws.env.GWS_IN_CONTAINER:
60 # if we are not in a container, use 'localhost:exposed_port' for all services
61 for k, v in OPTIONS.items():
62 if k.endswith('.host'):
63 OPTIONS[k] = 'localhost'
64 if k.endswith('.port'):
65 OPTIONS[k] = OPTIONS[k.replace('.port', '.expose_port')]
68##
71DEFAULT_METADATA = {
72 'language': 'de',
73 #
74 'contactAddress': 'contactAddress 123',
75 'contactAddressType': 'postal',
76 'contactArea': 'contactArea1',
77 'contactCity': 'contactCity1',
78 'contactCountry': 'contactCountry1',
79 'contactEmail': 'contact@example.de',
80 'contactFax': '+49 (0)123-45',
81 'contactOrganization': 'contactOrganization1',
82 'contactPerson': 'contactPerson1',
83 'contactPhone': '+49 (0)123-45',
84 'contactPosition': 'contactPosition1',
85 'contactProviderName': 'contactProviderName1',
86 'contactProviderSite': 'contactProviderSite1',
87 'contactRole': 'pointOfContact',
88 'contactUrl': 'https://contact.@example.de',
89 'contactZip': '12345',
90 #
91 'catalogUid': 'catalogUid1',
92 'temporalBegin': '2021-01-02',
93 'temporalEnd': '2022-01-02',
94 'dateCreated': '2023-01-02',
95 'dateUpdated': '2024-01-02',
96 #
97 # 'inspireMandatoryKeyword': 'infoMapAccessService',
98 'inspireDegreeOfConformity': 'conformant',
99 'inspireResourceType': 'service',
100 # 'inspireSpatialDataServiceType': 'view',
101 'inspireSpatialScope': 'national',
102 'inspireSpatialScopeName': 'national',
103 'inspireTheme': 'administrativeUnits',
104 #
105 'isoMaintenanceFrequencyCode': 'annual',
106 'isoQualityConformanceExplanation': 'isoQualityConformanceExplanation1',
107 'isoQualityConformanceQualityPass': True,
108 'isoQualityConformanceSpecificationDate': '2020-01-02',
109 'isoQualityConformanceSpecificationTitle': 'isoQualityConformanceSpecificationTitle1',
110 'isoQualityLineageSource': 'isoQualityLineageSource1',
111 'isoQualityLineageSourceScale': 1000,
112 'isoQualityLineageStatement': 'isoQualityLineageStatement1',
113 'isoRestrictionCode': 'copyright',
114 'isoServiceFunction': 'information',
115 'isoScope': 'dataset',
116 'isoScopeName': 'isoScopeName1',
117 'isoSpatialRepresentationType': 'vector',
118 'isoTopicCategories': ['boundaries', 'farming'],
119 'isoSpatialResolution': 123,
120}
123##
126def _config_defaults():
127 return f"""
128 database.providers+ {{
129 uid "GWS_TEST_POSTGRES_PROVIDER"
130 type "postgres"
131 host {OPTIONS['service.postgres.host']!r}
132 port {OPTIONS['service.postgres.port']!r}
133 username {OPTIONS['service.postgres.user']!r}
134 password {OPTIONS['service.postgres.password']!r}
135 database {OPTIONS['service.postgres.database']!r}
136 schemaCacheLifeTime 0
137 }}
138 """
141def _to_data(x):
142 if isinstance(x, gws.Data):
143 for k, v in vars(x).items():
144 setattr(x, k, _to_data(v))
145 return x
146 if isinstance(x, dict):
147 d = gws.Data()
148 for k, v in x.items():
149 setattr(d, k, _to_data(v))
150 return d
151 if isinstance(x, list):
152 return [_to_data(y) for y in x]
153 if isinstance(x, tuple):
154 return tuple(_to_data(y) for y in x)
155 return x
158_GWS_SPEC_RUNTIME = None
161def gws_specs() -> gws.SpecRuntime:
162 global _GWS_SPEC_RUNTIME
164 if _GWS_SPEC_RUNTIME is None:
165 base = option('BASE_DIR')
166 _GWS_SPEC_RUNTIME = gws.spec.runtime.create(f'{base}/config/MANIFEST.json', read_cache=False, write_cache=False)
168 return _GWS_SPEC_RUNTIME
171def gws_root(cfg: str = '', specs: gws.SpecRuntime = None, activate=True, defaults=True, **vars):
172 cfg = cfg or ''
173 if defaults:
174 cfg = _config_defaults() + '\n' + cfg
176 cfg = f'server.log.level {gws.log.get_level()}\n' + cfg
178 for k, v in vars.items():
179 cfg = cfg.replace('{' + k + '}', str(v))
181 parsed_config = _to_data(gws.lib.vendor.slon.parse(cfg, as_object=True))
182 specs = mock.register(specs or gws_specs())
183 root = gws.config.initialize(specs, gws.Config(parsed_config))
185 if root.configErrors:
186 for err in root.configErrors:
187 gws.log.error(f'CONFIGURATION ERROR: {err}')
188 raise gws.ConfigurationError('config failed')
190 if not activate:
191 return root
193 root = gws.config.activate(root)
194 return root
197def gws_system_user():
198 return gws.base.auth.user.SystemUser(None, roles=[])
201def get_db(root) -> gws.DatabaseProvider:
202 return root.get('GWS_TEST_POSTGRES_PROVIDER')
205##
207# ref: https://werkzeug.palletsprojects.com/en/3.0.x/test/
210class TestResponse(werkzeug.test.TestResponse):
211 cookies: dict[str, werkzeug.test.Cookie]
214def _wz_request(root, **kwargs):
215 client = werkzeug.test.Client(gws.base.web.wsgi_app.make_application(root))
217 cookies = cast(list[werkzeug.test.Cookie], kwargs.pop('cookies', []))
218 for c in cookies:
219 client.set_cookie(
220 key=c.key,
221 value=c.value,
222 max_age=c.max_age,
223 expires=c.expires,
224 path=c.path,
225 domain=c.domain,
226 secure=c.secure,
227 httponly=c.http_only,
228 )
230 res = client.open(**kwargs)
232 # for some reason, responses do not include cookies, work around this
233 res.cookies = {c.key: c for c in (client._cookies or {}).values()}
234 return res
237class http:
238 @classmethod
239 def get(cls, root, url, **kwargs) -> TestResponse:
240 url = re.sub(r'\s+', '', url.strip())
241 url = '/' + url.strip('/')
242 return _wz_request(root, method='GET', path=url, **kwargs)
244 @classmethod
245 def api(cls, root, cmd, request=None, **kwargs) -> TestResponse:
246 path = gws.c.SERVER_ENDPOINT
247 if cmd:
248 path += '/' + cmd
249 return _wz_request(root, method='POST', path=path, json=request or {}, **kwargs)
252##
255class pg:
256 saEngine: Optional[sa.Engine] = None
258 @classmethod
259 def connect(cls):
260 # kwargs.setdefault('pool_pre_ping', True)
261 # kwargs.setdefault('echo', self.root.app.developer_option('db.engine_echo'))
263 if not cls.saEngine:
264 cls.saEngine = sa.create_engine(cls.url(), poolclass=sa.NullPool)
266 return cls.saEngine.connect()
268 @classmethod
269 def url(cls):
270 return gws.lib.net.make_url(
271 scheme='postgresql',
272 username=OPTIONS['service.postgres.user'],
273 password=OPTIONS['service.postgres.password'],
274 hostname=OPTIONS['service.postgres.host'],
275 port=OPTIONS['service.postgres.port'],
276 path=OPTIONS['service.postgres.database'],
277 )
279 @classmethod
280 def create_schema(cls, name: str):
281 with cls.connect() as conn:
282 conn.execute(sa.text(f'DROP SCHEMA IF EXISTS {name} CASCADE'))
283 conn.execute(sa.text(f'CREATE SCHEMA {name}'))
284 conn.commit()
286 @classmethod
287 def create(cls, table_name: str, col_defs: dict):
288 with cls.connect() as conn:
289 conn.execute(sa.text(f'DROP TABLE IF EXISTS {table_name} CASCADE'))
290 ddl = _comma(f'{k} {v}' for k, v in col_defs.items())
291 conn.execute(sa.text(f'CREATE TABLE {table_name} ( {ddl} )'))
292 conn.commit()
294 @classmethod
295 def clear(cls, table_name: str):
296 with cls.connect() as conn:
297 conn.execute(sa.text(f'TRUNCATE TABLE {table_name}'))
298 conn.commit()
300 @classmethod
301 def insert(cls, table_name: str, row_dicts: list[dict]):
302 with cls.connect() as conn:
303 conn.execute(sa.text(f'TRUNCATE TABLE {table_name}'))
304 if row_dicts:
305 names = _comma(k for k in row_dicts[0])
306 values = _comma(':' + k for k in row_dicts[0])
307 ins = sa.text(f'INSERT INTO {table_name} ( {names} ) VALUES( {values} )')
308 conn.execute(ins, row_dicts)
309 conn.commit()
311 @classmethod
312 def rows(cls, sql: str) -> list[tuple]:
313 with cls.connect() as conn:
314 return [tuple(r) for r in conn.execute(sa.text(sql))]
316 @classmethod
317 def content(cls, sql_or_table_name: str) -> list[tuple]:
318 if not sql_or_table_name.lower().startswith('select'):
319 sql_or_table_name = f'SELECT * FROM {sql_or_table_name}'
320 return cls.rows(sql_or_table_name)
322 @classmethod
323 def exec(cls, sql: str, **kwargs):
324 with cls.connect() as conn:
325 for s in sql.split(';'):
326 if s.strip():
327 conn.execute(sa.text(s.strip()), kwargs)
328 conn.commit()
330 @classmethod
331 def connections(cls):
332 with cls.connect() as conn:
333 mark = gws.u.random_string(16)
334 sql = f"""
335 SELECT
336 *,
337 '{mark}' AS _mark
338 FROM
339 pg_stat_activity
340 WHERE
341 backend_type = 'client backend'
342 """
344 rs = []
345 for r in conn.execute(sa.text(sql)):
346 r = r._asdict()
347 r.pop('_mark', None)
348 q = r.get('query', '')
349 if mark in q:
350 continue
351 rs.append(r)
352 return rs
355##
358class log:
359 _buf = []
361 @classmethod
362 def write(cls, s):
363 cls._buf.append(s)
365 @classmethod
366 def reset(cls):
367 cls._buf = []
369 @classmethod
370 def get(cls):
371 r = cls._buf
372 cls._buf = []
373 return r
376##
379def feature_from_dict(model, atts) -> gws.Feature:
380 f = gws.base.feature.new(model=model, record=gws.FeatureRecord(attributes=atts))
381 f.attributes = atts
382 return f
385def feature(model, **atts) -> gws.Feature:
386 return feature_from_dict(model, atts)
389def ewkb(wkt: str, srid=3857):
390 shape = gws.base.shape.from_wkt(wkt, default_crs=gws.lib.crs.get(srid))
391 return shape.to_ewkb()
394def model_context(**kwargs):
395 kwargs.setdefault('op', gws.ModelOperation.read)
396 kwargs.setdefault('user', gws_system_user())
397 return gws.ModelContext(**kwargs)
400##
403class mockserver:
404 @classmethod
405 def add(cls, text):
406 cls.post('__add', data=text)
408 @classmethod
409 def set(cls, text):
410 cls.post('__set', data=text)
412 @classmethod
413 def reset(cls):
414 cls.post('__del')
416 @classmethod
417 def post(cls, verb, data=''):
418 requests.post(cls.url(verb), data=data)
420 @classmethod
421 def url(cls, path=''):
422 h = OPTIONS.get('service.mockserver.host')
423 p = OPTIONS.get('service.mockserver.port')
424 u = f'http://{h}:{p}'
425 if path:
426 u += '/' + path
427 return u
430##
433def fxml(s):
434 """Format XML for easier comparison in tests."""
435 s = re.sub(r'\s+', ' ', s.strip())
436 s = re.sub(r'\s*(<|>)\s*', r'\1', s)
437 s = s.replace('<', '\n<').replace('>', '>\n').replace('\n\n', '\n')
438 return s.strip()
441def is_close(a, b, rel_tol=1e-9, abs_tol=0.0):
442 if isinstance(a, (int, float)):
443 return math.isclose(a, b, rel_tol=rel_tol, abs_tol=abs_tol)
444 return len(a) == len(b) and all(math.isclose(x, y, rel_tol=rel_tol, abs_tol=abs_tol) for x, y in zip(a, b))
447##
450def unlink(path):
451 if os.path.isfile(path):
452 os.unlink(path)
453 return
454 if os.path.isdir(path):
455 for de in os.scandir(path):
456 unlink(de.path)
457 os.rmdir(path)
460def ensure_dir(path, clear=False):
461 def _clear(d):
462 for de in os.scandir(d):
463 if de.is_dir():
464 _clear(de.path)
465 os.rmdir(de.path)
466 else:
467 os.unlink(de.path)
469 os.makedirs(path, exist_ok=True)
470 if clear:
471 _clear(path)
474def path_in_base_dir(path):
475 base = option('BASE_DIR')
476 return f'{base}/{path}'
479@contextlib.contextmanager
480def temp_file_in_base_dir(content='', keep=False):
481 # for exec/chmod tests, which cannot use tmp_path
483 base = option('BASE_DIR')
484 d = f'{base}/tmp'
485 ensure_dir(d)
487 p = d + '/' + gws.u.random_string(16)
488 gws.u.write_file(p, content)
489 yield p
491 if not keep:
492 unlink(p)
495@contextlib.contextmanager
496def temp_dir_in_base_dir(keep=False):
497 base = option('BASE_DIR')
499 d = f'{base}/tmp/' + gws.u.random_string(16)
500 ensure_dir(d, clear=True)
501 yield d
503 if not keep:
504 unlink(d)
507def dump(s):
508 print('----')
509 print(s)
510 print('-----')
513_comma = ','.join