Coverage for gws-app/gws/base/database/_test/model_test.py: 100%
59 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
1import os
2import gws.config
3import gws.base.feature
4import gws.lib.sa as sa
5import gws.test.util as u
8##
10_weird_names = [
11 'Weird.Schema.. ""äöü"" !',
12 'Weird.Table.. ""äöü"" !',
13 'Weird.Column.. ""äöü"" !',
14]
16@u.fixture(scope='module')
17def root():
18 u.pg.create('plain', {'id': 'int primary key', 'a': 'text'})
19 u.pg.create('auto_pk', {'id': 'serial primary key', 'a': 'text'})
20 u.pg.create('no_pk', {'id': 'int', 'a': 'text'})
22 ws, wt, wc = _weird_names
23 ddl = f'''
24 drop schema if exists "{ws}" cascade;
25 create schema "{ws}";
26 create table "{ws}"."{wt}" (
27 id integer primary key,
28 "{wc}" text
29 )
30 '''
32 conn = u.pg.connect()
33 for d in ddl.split(';'):
34 conn.execute(sa.text(d))
35 conn.commit()
37 cfg = f'''
38 models+ {{
39 uid "PLAIN" type "postgres" tableName "plain"
40 }}
41 models+ {{
42 uid "AUTO_PK" type "postgres" tableName "auto_pk"
43 }}
44 models+ {{
45 uid "NO_PK" type "postgres" tableName "no_pk"
46 }}
47 models+ {{
48 uid "WEIRD" type "postgres" tableName ' "{ws}"."{wt}" '
49 }}
50 '''
52 yield u.gws_root(cfg)
55##
57def test_get_features(root: gws.Root):
58 mc = gws.ModelContext(
59 user=u.gws_system_user(),
60 op=gws.ModelOperation.read,
61 )
62 mo = u.cast(gws.Model, root.get('PLAIN'))
64 u.pg.insert('plain', [
65 dict(id=1, a='11'),
66 dict(id=2, a='22'),
67 dict(id=3, a='33'),
68 dict(id=4, a='44'),
69 ])
71 fs = mo.get_features([2, 3], mc)
72 assert [f.get('a') for f in fs] == ['22', '33']
75##
77def test_create_with_explicit_pk(root: gws.Root):
78 mc = gws.ModelContext(
79 user=u.gws_system_user(),
80 )
81 mo = u.cast(gws.Model, root.get('PLAIN'))
83 u.pg.clear('plain')
85 mo.create_feature(u.feature(mo, id=15, a='aa'), mc)
86 mo.create_feature(u.feature(mo, id=16, a='bb'), mc)
87 mo.create_feature(u.feature(mo, id=17, a='cc'), mc)
89 assert u.pg.content('select id, a from plain') == [
90 (15, 'aa'),
91 (16, 'bb'),
92 (17, 'cc'),
93 ]
96def test_create_with_auto_pk(root: gws.Root):
97 mc = gws.ModelContext(
98 user=u.gws_system_user(),
99 )
100 mo = u.cast(gws.Model, root.get('AUTO_PK'))
102 u.pg.clear('auto_pk')
104 mo.create_feature(u.feature(mo, a='aa'), mc)
105 mo.create_feature(u.feature(mo, a='bb'), mc)
106 mo.create_feature(u.feature(mo, a='cc'), mc)
108 assert u.pg.content('auto_pk') == [
109 (1, 'aa'),
110 (2, 'bb'),
111 (3, 'cc'),
112 ]
114def test_create_no_pk(root: gws.Root):
115 mc = gws.ModelContext(
116 user=u.gws_system_user(),
117 )
118 mo = u.cast(gws.Model, root.get('NO_PK'))
120 u.pg.clear('no_pk')
122 mo.create_feature(u.feature(mo, id=11, a='aa'), mc)
123 mo.create_feature(u.feature(mo, id=22, a='bb'), mc)
124 mo.create_feature(u.feature(mo, id=33, a='cc'), mc)
126 assert u.pg.content('no_pk') == [
127 (11, 'aa'),
128 (22, 'bb'),
129 (33, 'cc'),
130 ]
132def test_weird_names(root: gws.Root):
133 mc = gws.ModelContext(
134 user=u.gws_system_user(),
135 )
136 mo = u.cast(gws.Model, root.get('WEIRD'))
138 ws, wt, wc = _weird_names
139 wc_noquot = wc.replace('""', '"')
141 u.pg.clear(f'"{ws}"."{wt}"')
143 mo.create_feature(u.feature_from_dict(mo, {'id': 15, wc_noquot: 'aa'}), mc)
144 mo.create_feature(u.feature_from_dict(mo, {'id': 16, wc_noquot: 'bb'}), mc)
145 mo.create_feature(u.feature_from_dict(mo, {'id': 17, wc_noquot: 'cc'}), mc)
147 assert u.pg.content(f'select id, "{wc}" from "{ws}"."{wt}"') == [
148 (15, 'aa'),
149 (16, 'bb'),
150 (17, 'cc'),
151 ]