Coverage for gws-app/gws/test/util.py: 86%
273 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
1"""Test utilities."""
3import contextlib
4import os
5import re
6import typing
7from typing import Optional
9import pytest
10import requests
11import werkzeug.test
13import gws
14import gws.base.auth
15import gws.base.feature
16import gws.base.shape
17import gws.base.web.wsgi_app
18import gws.config
19import gws.lib.crs
20import gws.lib.cli
21import gws.lib.jsonx
22import gws.lib.net
23import gws.lib.sa as sa
24import gws.lib.vendor.slon
25import gws.spec.runtime
26import gws.test.mock
28##
30mock = gws.test.mock
32fixture = pytest.fixture
33raises = pytest.raises
35monkey_patch = pytest.MonkeyPatch.context
37cast = typing.cast
39exec = gws.lib.cli.exec
41##
43OPTIONS = {}
46def option(name, default=None):
47 return OPTIONS.get(name, default)
50def load_options(base_dir):
51 global OPTIONS
53 OPTIONS = gws.lib.jsonx.from_path(f'{base_dir}/config/OPTIONS.json')
54 OPTIONS['BASE_DIR'] = base_dir
56 ensure_dir(OPTIONS['BASE_DIR'] + '/tmp', clear=True)
58 if not gws.env.GWS_IN_CONTAINER:
59 # if we are not in a container, use 'localhost:exposed_port' for all services
60 for k, v in OPTIONS.items():
61 if k.endswith('.host'):
62 OPTIONS[k] = 'localhost'
63 if k.endswith('.port'):
64 OPTIONS[k] = OPTIONS[k.replace('.port', '.expose_port')]
67##
70DEFAULT_METADATA = {
71 'language': 'de',
72 #
73 'contactAddress': 'contactAddress 123',
74 'contactAddressType': 'postal',
75 'contactArea': 'contactArea1',
76 'contactCity': 'contactCity1',
77 'contactCountry': 'contactCountry1',
78 'contactEmail': 'contact@example.de',
79 'contactFax': '+49 (0)123-45',
80 'contactOrganization': 'contactOrganization1',
81 'contactPerson': 'contactPerson1',
82 'contactPhone': '+49 (0)123-45',
83 'contactPosition': 'contactPosition1',
84 'contactProviderName': 'contactProviderName1',
85 'contactProviderSite': 'contactProviderSite1',
86 'contactRole': 'pointOfContact',
87 'contactUrl': 'https://contact.@example.de',
88 'contactZip': '12345',
89 #
90 'catalogUid': 'catalogUid1',
91 'temporalBegin': '2021-01-02',
92 'temporalEnd': '2022-01-02',
93 'dateCreated': '2023-01-02',
94 'dateUpdated': '2024-01-02',
95 #
96 # 'inspireMandatoryKeyword': 'infoMapAccessService',
97 'inspireDegreeOfConformity': 'conformant',
98 'inspireResourceType': 'service',
99 # 'inspireSpatialDataServiceType': 'view',
100 'inspireSpatialScope': 'national',
101 'inspireSpatialScopeName': 'national',
102 'inspireTheme': 'administrativeUnits',
103 #
104 'isoMaintenanceFrequencyCode': 'annual',
105 'isoQualityConformanceExplanation': 'isoQualityConformanceExplanation1',
106 'isoQualityConformanceQualityPass': True,
107 'isoQualityConformanceSpecificationDate': '2020-01-02',
108 'isoQualityConformanceSpecificationTitle': 'isoQualityConformanceSpecificationTitle1',
109 'isoQualityLineageSource': 'isoQualityLineageSource1',
110 'isoQualityLineageSourceScale': 1000,
111 'isoQualityLineageStatement': 'isoQualityLineageStatement1',
112 'isoRestrictionCode': 'copyright',
113 'isoServiceFunction': 'information',
114 'isoScope': 'dataset',
115 'isoScopeName': 'isoScopeName1',
116 'isoSpatialRepresentationType': 'vector',
117 'isoTopicCategories': ['boundaries', 'farming'],
118 'isoSpatialResolution': 123,
119}
122##
125def _config_defaults():
126 return f"""
127 database.providers+ {{
128 uid "GWS_TEST_POSTGRES_PROVIDER"
129 type "postgres"
130 host {OPTIONS['service.postgres.host']!r}
131 port {OPTIONS['service.postgres.port']!r}
132 username {OPTIONS['service.postgres.user']!r}
133 password {OPTIONS['service.postgres.password']!r}
134 database {OPTIONS['service.postgres.database']!r}
135 schemaCacheLifeTime 0
136 }}
137 """
140def _to_data(x):
141 if isinstance(x, gws.Data):
142 for k, v in vars(x).items():
143 setattr(x, k, _to_data(v))
144 return x
145 if isinstance(x, dict):
146 d = gws.Data()
147 for k, v in x.items():
148 setattr(d, k, _to_data(v))
149 return d
150 if isinstance(x, list):
151 return [_to_data(y) for y in x]
152 if isinstance(x, tuple):
153 return tuple(_to_data(y) for y in x)
154 return x
157_GWS_SPEC_RUNTIME = None
160def gws_specs() -> gws.SpecRuntime:
161 global _GWS_SPEC_RUNTIME
163 if _GWS_SPEC_RUNTIME is None:
164 base = option('BASE_DIR')
165 _GWS_SPEC_RUNTIME = gws.spec.runtime.create(f'{base}/config/MANIFEST.json', read_cache=False, write_cache=False)
167 return _GWS_SPEC_RUNTIME
170def gws_root(cfg: str = '', specs: gws.SpecRuntime = None, activate=True, defaults=True, **vars):
171 cfg = cfg or ''
172 if defaults:
173 cfg = _config_defaults() + '\n' + cfg
175 cfg = f'server.log.level {gws.log.get_level()}\n' + cfg
177 for k, v in vars.items():
178 cfg = cfg.replace('{' + k + '}', str(v))
180 parsed_config = _to_data(gws.lib.vendor.slon.parse(cfg, as_object=True))
181 specs = mock.register(specs or gws_specs())
182 root = gws.config.initialize(specs, gws.Config(parsed_config))
184 if root.configErrors:
185 for err in root.configErrors:
186 gws.log.error(f'CONFIGURATION ERROR: {err}')
187 raise gws.ConfigurationError('config failed')
189 if not activate:
190 return root
192 root = gws.config.activate(root)
193 return root
196def gws_system_user():
197 return gws.base.auth.user.SystemUser(None, roles=[])
200def get_db(root) -> gws.DatabaseProvider:
201 return root.get('GWS_TEST_POSTGRES_PROVIDER')
204##
206# ref: https://werkzeug.palletsprojects.com/en/3.0.x/test/
209class TestResponse(werkzeug.test.TestResponse):
210 cookies: dict[str, werkzeug.test.Cookie]
213def _wz_request(root, **kwargs):
214 client = werkzeug.test.Client(gws.base.web.wsgi_app.make_application(root))
216 cookies = cast(list[werkzeug.test.Cookie], kwargs.pop('cookies', []))
217 for c in cookies:
218 client.set_cookie(
219 key=c.key,
220 value=c.value,
221 max_age=c.max_age,
222 expires=c.expires,
223 path=c.path,
224 domain=c.domain,
225 secure=c.secure,
226 httponly=c.http_only,
227 )
229 res = client.open(**kwargs)
231 # for some reason, responses do not include cookies, work around this
232 res.cookies = {c.key: c for c in (client._cookies or {}).values()}
233 return res
236class http:
237 @classmethod
238 def get(cls, root, url, **kwargs) -> TestResponse:
239 url = re.sub(r'\s+', '', url.strip())
240 url = '/' + url.strip('/')
241 return _wz_request(root, method='GET', path=url, **kwargs)
243 @classmethod
244 def api(cls, root, cmd, request=None, **kwargs) -> TestResponse:
245 path = gws.c.SERVER_ENDPOINT
246 if cmd:
247 path += '/' + cmd
248 return _wz_request(root, method='POST', path=path, json=request or {}, **kwargs)
251##
254class pg:
255 saEngine: Optional[sa.Engine] = None
257 @classmethod
258 def connect(cls):
259 # kwargs.setdefault('pool_pre_ping', True)
260 # kwargs.setdefault('echo', self.root.app.developer_option('db.engine_echo'))
262 if not cls.saEngine:
263 cls.saEngine = sa.create_engine(cls.url(), poolclass=sa.NullPool)
265 return cls.saEngine.connect()
267 @classmethod
268 def url(cls):
269 return gws.lib.net.make_url(
270 scheme='postgresql',
271 username=OPTIONS['service.postgres.user'],
272 password=OPTIONS['service.postgres.password'],
273 hostname=OPTIONS['service.postgres.host'],
274 port=OPTIONS['service.postgres.port'],
275 path=OPTIONS['service.postgres.database'],
276 )
278 @classmethod
279 def create_schema(cls, name: str):
280 with cls.connect() as conn:
281 conn.execute(sa.text(f'DROP SCHEMA IF EXISTS {name} CASCADE'))
282 conn.execute(sa.text(f'CREATE SCHEMA {name}'))
283 conn.commit()
285 @classmethod
286 def create(cls, table_name: str, col_defs: dict):
287 with cls.connect() as conn:
288 conn.execute(sa.text(f'DROP TABLE IF EXISTS {table_name} CASCADE'))
289 ddl = _comma(f'{k} {v}' for k, v in col_defs.items())
290 conn.execute(sa.text(f'CREATE TABLE {table_name} ( {ddl} )'))
291 conn.commit()
293 @classmethod
294 def clear(cls, table_name: str):
295 with cls.connect() as conn:
296 conn.execute(sa.text(f'TRUNCATE TABLE {table_name}'))
297 conn.commit()
299 @classmethod
300 def insert(cls, table_name: str, row_dicts: list[dict]):
301 with cls.connect() as conn:
302 conn.execute(sa.text(f'TRUNCATE TABLE {table_name}'))
303 if row_dicts:
304 names = _comma(k for k in row_dicts[0])
305 values = _comma(':' + k for k in row_dicts[0])
306 ins = sa.text(f'INSERT INTO {table_name} ( {names} ) VALUES( {values} )')
307 conn.execute(ins, row_dicts)
308 conn.commit()
310 @classmethod
311 def rows(cls, sql: str) -> list[tuple]:
312 with cls.connect() as conn:
313 return [tuple(r) for r in conn.execute(sa.text(sql))]
315 @classmethod
316 def content(cls, sql_or_table_name: str) -> list[tuple]:
317 if not sql_or_table_name.lower().startswith('select'):
318 sql_or_table_name = f'SELECT * FROM {sql_or_table_name}'
319 return cls.rows(sql_or_table_name)
321 @classmethod
322 def exec(cls, sql: str, **kwargs):
323 with cls.connect() as conn:
324 for s in sql.split(';'):
325 if s.strip():
326 conn.execute(sa.text(s.strip()), kwargs)
327 conn.commit()
329 @classmethod
330 def connections(cls):
331 with cls.connect() as conn:
332 mark = gws.u.random_string(16)
333 sql = f"""
334 SELECT
335 *,
336 '{mark}' AS _mark
337 FROM
338 pg_stat_activity
339 WHERE
340 backend_type = 'client backend'
341 """
343 rs = []
344 for r in conn.execute(sa.text(sql)):
345 r = r._asdict()
346 r.pop('_mark', None)
347 q = r.get('query', '')
348 if mark in q:
349 continue
350 rs.append(r)
351 return rs
354##
357class log:
358 _buf = []
360 @classmethod
361 def write(cls, s):
362 cls._buf.append(s)
364 @classmethod
365 def reset(cls):
366 cls._buf = []
368 @classmethod
369 def get(cls):
370 r = cls._buf
371 cls._buf = []
372 return r
375##
378def feature_from_dict(model, atts) -> gws.Feature:
379 f = gws.base.feature.new(model=model, record=gws.FeatureRecord(attributes=atts))
380 f.attributes = atts
381 return f
384def feature(model, **atts) -> gws.Feature:
385 return feature_from_dict(model, atts)
388def ewkb(wkt: str, srid=3857):
389 shape = gws.base.shape.from_wkt(wkt, default_crs=gws.lib.crs.get(srid))
390 return shape.to_ewkb()
393def model_context(**kwargs):
394 kwargs.setdefault('op', gws.ModelOperation.read)
395 kwargs.setdefault('user', gws_system_user())
396 return gws.ModelContext(**kwargs)
399##
402class mockserver:
403 @classmethod
404 def add(cls, text):
405 cls.post('__add', data=text)
407 @classmethod
408 def set(cls, text):
409 cls.post('__set', data=text)
411 @classmethod
412 def reset(cls):
413 cls.post('__del')
415 @classmethod
416 def post(cls, verb, data=''):
417 requests.post(cls.url(verb), data=data)
419 @classmethod
420 def url(cls, path=''):
421 h = OPTIONS.get('service.mockserver.host')
422 p = OPTIONS.get('service.mockserver.port')
423 u = f'http://{h}:{p}'
424 if path:
425 u += '/' + path
426 return u
429##
432def fxml(s):
433 """Format XML for easier comparison in tests."""
434 s = re.sub(r'\s+', ' ', s.strip())
435 s = re.sub(r'\s*(<|>)\s*', r'\1', s)
436 s = s.replace('<', '\n<').replace('>', '>\n').replace('\n\n', '\n')
437 return s.strip()
440##
443def unlink(path):
444 if os.path.isfile(path):
445 os.unlink(path)
446 return
447 if os.path.isdir(path):
448 for de in os.scandir(path):
449 unlink(de.path)
450 os.rmdir(path)
453def ensure_dir(path, clear=False):
454 def _clear(d):
455 for de in os.scandir(d):
456 if de.is_dir():
457 _clear(de.path)
458 os.rmdir(de.path)
459 else:
460 os.unlink(de.path)
462 os.makedirs(path, exist_ok=True)
463 if clear:
464 _clear(path)
467def path_in_base_dir(path):
468 base = option('BASE_DIR')
469 return f'{base}/{path}'
472@contextlib.contextmanager
473def temp_file_in_base_dir(content='', keep=False):
474 # for exec/chmod tests, which cannot use tmp_path
476 base = option('BASE_DIR')
477 d = f'{base}/tmp'
478 ensure_dir(d)
480 p = d + '/' + gws.u.random_string(16)
481 gws.u.write_file(p, content)
482 yield p
484 if not keep:
485 unlink(p)
488@contextlib.contextmanager
489def temp_dir_in_base_dir(keep=False):
490 base = option('BASE_DIR')
492 d = f'{base}/tmp/' + gws.u.random_string(16)
493 ensure_dir(d, clear=True)
494 yield d
496 if not keep:
497 unlink(d)
500def dump(s):
501 print('----')
502 print(s)
503 print('-----')
506_comma = ','.join