Coverage for gws-app/gws/base/database/_test/model_test.py: 100%

59 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1import os 

2import gws.config 

3import gws.base.feature 

4import gws.lib.sa as sa 

5import gws.test.util as u 

6 

7 

8## 

9 

10_weird_names = [ 

11 'Weird.Schema.. ""äöü"" !', 

12 'Weird.Table.. ""äöü"" !', 

13 'Weird.Column.. ""äöü"" !', 

14] 

15 

16@u.fixture(scope='module') 

17def root(): 

18 u.pg.create('plain', {'id': 'int primary key', 'a': 'text'}) 

19 u.pg.create('auto_pk', {'id': 'serial primary key', 'a': 'text'}) 

20 u.pg.create('no_pk', {'id': 'int', 'a': 'text'}) 

21 

22 ws, wt, wc = _weird_names 

23 ddl = f''' 

24 drop schema if exists "{ws}" cascade; 

25 create schema "{ws}"; 

26 create table "{ws}"."{wt}" ( 

27 id integer primary key, 

28 "{wc}" text 

29 ) 

30 ''' 

31 

32 conn = u.pg.connect() 

33 for d in ddl.split(';'): 

34 conn.execute(sa.text(d)) 

35 conn.commit() 

36 

37 cfg = f''' 

38 models+ {{  

39 uid "PLAIN" type "postgres" tableName "plain"  

40 }} 

41 models+ {{ 

42 uid "AUTO_PK" type "postgres" tableName "auto_pk" 

43 }} 

44 models+ {{ 

45 uid "NO_PK" type "postgres" tableName "no_pk" 

46 }} 

47 models+ {{ 

48 uid "WEIRD" type "postgres" tableName ' "{ws}"."{wt}" ' 

49 }} 

50 ''' 

51 

52 yield u.gws_root(cfg) 

53 

54 

55## 

56 

57def test_get_features(root: gws.Root): 

58 mc = gws.ModelContext( 

59 user=u.gws_system_user(), 

60 op=gws.ModelOperation.read, 

61 ) 

62 mo = u.cast(gws.Model, root.get('PLAIN')) 

63 

64 u.pg.insert('plain', [ 

65 dict(id=1, a='11'), 

66 dict(id=2, a='22'), 

67 dict(id=3, a='33'), 

68 dict(id=4, a='44'), 

69 ]) 

70 

71 fs = mo.get_features([2, 3], mc) 

72 assert [f.get('a') for f in fs] == ['22', '33'] 

73 

74 

75## 

76 

77def test_create_with_explicit_pk(root: gws.Root): 

78 mc = gws.ModelContext( 

79 user=u.gws_system_user(), 

80 ) 

81 mo = u.cast(gws.Model, root.get('PLAIN')) 

82 

83 u.pg.clear('plain') 

84 

85 mo.create_feature(u.feature(mo, id=15, a='aa'), mc) 

86 mo.create_feature(u.feature(mo, id=16, a='bb'), mc) 

87 mo.create_feature(u.feature(mo, id=17, a='cc'), mc) 

88 

89 assert u.pg.content('select id, a from plain') == [ 

90 (15, 'aa'), 

91 (16, 'bb'), 

92 (17, 'cc'), 

93 ] 

94 

95 

96def test_create_with_auto_pk(root: gws.Root): 

97 mc = gws.ModelContext( 

98 user=u.gws_system_user(), 

99 ) 

100 mo = u.cast(gws.Model, root.get('AUTO_PK')) 

101 

102 u.pg.clear('auto_pk') 

103 

104 mo.create_feature(u.feature(mo, a='aa'), mc) 

105 mo.create_feature(u.feature(mo, a='bb'), mc) 

106 mo.create_feature(u.feature(mo, a='cc'), mc) 

107 

108 assert u.pg.content('auto_pk') == [ 

109 (1, 'aa'), 

110 (2, 'bb'), 

111 (3, 'cc'), 

112 ] 

113 

114def test_create_no_pk(root: gws.Root): 

115 mc = gws.ModelContext( 

116 user=u.gws_system_user(), 

117 ) 

118 mo = u.cast(gws.Model, root.get('NO_PK')) 

119 

120 u.pg.clear('no_pk') 

121 

122 mo.create_feature(u.feature(mo, id=11, a='aa'), mc) 

123 mo.create_feature(u.feature(mo, id=22, a='bb'), mc) 

124 mo.create_feature(u.feature(mo, id=33, a='cc'), mc) 

125 

126 assert u.pg.content('no_pk') == [ 

127 (11, 'aa'), 

128 (22, 'bb'), 

129 (33, 'cc'), 

130 ] 

131 

132def test_weird_names(root: gws.Root): 

133 mc = gws.ModelContext( 

134 user=u.gws_system_user(), 

135 ) 

136 mo = u.cast(gws.Model, root.get('WEIRD')) 

137 

138 ws, wt, wc = _weird_names 

139 wc_noquot = wc.replace('""', '"') 

140 

141 u.pg.clear(f'"{ws}"."{wt}"') 

142 

143 mo.create_feature(u.feature_from_dict(mo, {'id': 15, wc_noquot: 'aa'}), mc) 

144 mo.create_feature(u.feature_from_dict(mo, {'id': 16, wc_noquot: 'bb'}), mc) 

145 mo.create_feature(u.feature_from_dict(mo, {'id': 17, wc_noquot: 'cc'}), mc) 

146 

147 assert u.pg.content(f'select id, "{wc}" from "{ws}"."{wt}"') == [ 

148 (15, 'aa'), 

149 (16, 'bb'), 

150 (17, 'cc'), 

151 ]