Coverage for gws-app/gws/base/database/model.py: 83%

162 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 23:09 +0200

1"""Database-based models.""" 

2 

3from typing import Optional 

4 

5import gws 

6import gws.base.feature 

7import gws.base.model 

8import gws.config.util 

9import gws.lib.sa as sa 

10 

11 

12class Config(gws.base.model.Config): 

13 """Configuration for the database model.""" 

14 

15 dbUid: Optional[str] 

16 """Database provider uid.""" 

17 tableName: Optional[str] 

18 """Table name for the model.""" 

19 sqlFilter: Optional[str] 

20 """Extra SQL filter.""" 

21 

22 

23class Props(gws.base.model.Props): 

24 pass 

25 

26 

27class Object(gws.base.model.Object, gws.DatabaseModel): 

28 def configure(self): 

29 self.tableName = self.cfg('tableName') or self.cfg('_defaultTableName') 

30 if not self.tableName: 

31 raise gws.ConfigurationError(f'table name missing in model {self!r}') 

32 

33 self.sqlFilter = self.cfg('sqlFilter') 

34 self.configure_model() 

35 

36 def configure_provider(self): 

37 return gws.config.util.configure_database_provider_for(self) 

38 

39 ## 

40 

41 def describe(self): 

42 return self.db.describe(self.tableName) 

43 

44 def table(self): 

45 return self.db.table(self.tableName) 

46 

47 def column(self, column_name): 

48 return self.db.column(self.table(), column_name) 

49 

50 def uid_column(self): 

51 if not self.uidName: 

52 raise gws.Error(f'no primary key found for table {self.tableName!r}') 

53 if not self.db.has_column(self.table(), self.uidName): 

54 raise gws.Error(f'invalid primary key {self.uidName!r} for table {self.tableName!r}') 

55 return self.db.column(self.table(), self.uidName) 

56 

57 ## 

58 

59 def find_features(self, search, mc): 

60 if not mc.user.can_read(self): 

61 raise gws.ForbiddenError(f'model {self.uid!r} can_read=False') 

62 

63 mc = gws.base.model.copy_context(mc) 

64 mc.search = search 

65 mc.dbSelect = gws.ModelSelectBuild( 

66 columns=[], 

67 geometryWhere=[], 

68 keywordWhere=[], 

69 order=[], 

70 where=[], 

71 ) 

72 

73 with self.db.connect(): 

74 for fld in self.fields: 

75 fld.before_select(mc) 

76 

77 sql = self.build_select(mc) 

78 if sql is None: 

79 return [] 

80 

81 features = self.fetch_features(sql) 

82 

83 for fld in self.fields: 

84 fld.after_select(features, mc) 

85 

86 return features 

87 

88 def fetch_features(self, select): 

89 features = [] 

90 

91 with self.db.connect() as conn: 

92 for row in conn.fetch_all(select): 

93 features.append( 

94 gws.base.feature.new( 

95 model=self, 

96 record=gws.FeatureRecord(attributes=row), 

97 ) 

98 ) 

99 

100 return features 

101 

102 def build_select(self, mc): 

103 # @TODO sorting should be handled on the field level 

104 sorts = mc.search.sort or self.defaultSort or [] 

105 for s in sorts: 

106 fn = sa.desc if s.reverse else sa.asc 

107 mc.dbSelect.order.append(fn(self.column(s.fieldName))) 

108 

109 sel = sa.select().select_from(self.table()) 

110 

111 if mc.search.uids: 

112 if not self.uidName: 

113 gws.log.debug(f'build_select: {self}: no primary key for {self.tableName=}') 

114 return 

115 sel = sel.where(self.uid_column().in_(mc.search.uids)) 

116 

117 if mc.search.keyword and not mc.dbSelect.keywordWhere: 

118 gws.log.debug(f'build_select: {self}: no keyword where') 

119 return 

120 if mc.dbSelect.keywordWhere: 

121 sel = sel.where(sa.or_(*mc.dbSelect.keywordWhere)) 

122 

123 if mc.search.shape and not mc.dbSelect.geometryWhere: 

124 gws.log.debug(f'build_select: {self}: no geometry where') 

125 return 

126 if mc.dbSelect.geometryWhere: 

127 sel = sel.where(sa.or_(*mc.dbSelect.geometryWhere)) 

128 

129 sel = sel.where(*mc.dbSelect.where) 

130 if mc.search.extraWhere: 

131 for w in mc.search.extraWhere: 

132 sel = sel.where(w) 

133 

134 if self.sqlFilter: 

135 sel = sel.where(sa.text('(' + self.sqlFilter + ')')) 

136 

137 cols = [] 

138 for col in mc.dbSelect.columns or []: 

139 if any(col is c for c in cols): 

140 continue 

141 cols.append(col) 

142 for col in mc.search.extraColumns or []: 

143 if any(col is c for c in cols): 

144 continue 

145 cols.append(col) 

146 

147 sel = sel.add_columns(*cols) 

148 

149 if mc.dbSelect.order: 

150 sel = sel.order_by(*mc.dbSelect.order) 

151 

152 return sel 

153 

154 ## 

155 

156 def init_feature(self, feature, mc): 

157 if not mc.user.can_create(self): 

158 raise gws.ForbiddenError(f'model {self.uid!r} can_create=False') 

159 

160 for fld in self.fields: 

161 fld.do_init(feature, mc) 

162 

163 for rf in feature.createWithFeatures: 

164 for fld in rf.model.fields: 

165 fld.do_init_related(feature, mc) 

166 

167 def create_feature(self, feature, mc): 

168 if not mc.user.can_create(self): 

169 raise gws.ForbiddenError(f'model {self.uid!r} can_create=False') 

170 

171 feature.record = gws.FeatureRecord(attributes={}, meta={}) 

172 

173 related_models = [] 

174 for from_feature in feature.createWithFeatures: 

175 if from_feature.model not in related_models: 

176 related_models.append(from_feature.model) 

177 

178 with self.db.connect() as conn: 

179 for m in related_models: 

180 for fld in m.fields: 

181 fld.before_create_related(feature, mc) 

182 

183 for fld in self.fields: 

184 fld.before_create(feature, mc) 

185 

186 sql = sa.insert(self.table()) 

187 rs = conn.execute(sql, feature.record.attributes) 

188 pk = rs.inserted_primary_key 

189 if not pk: 

190 feature.insertedPrimaryKey = None 

191 elif len(pk) == 1: 

192 feature.insertedPrimaryKey = pk[0] 

193 else: 

194 raise gws.Error(f'composite primary keys not supported for {self.tableName!r}') 

195 

196 for fld in self.fields: 

197 fld.after_create(feature, mc) 

198 

199 for m in related_models: 

200 for fld in m.fields: 

201 fld.after_create_related(feature, mc) 

202 

203 conn.commit() 

204 

205 return feature.insertedPrimaryKey 

206 

207 def update_feature(self, feature, mc): 

208 if not mc.user.can_write(self): 

209 raise gws.ForbiddenError(f'model {self.uid!r} can_write=False') 

210 

211 feature.record = gws.FeatureRecord(attributes={}, meta={}) 

212 

213 with self.db.connect() as conn: 

214 for fld in self.fields: 

215 fld.before_update(feature, mc) 

216 

217 if not feature.record.attributes: 

218 return feature.uid() 

219 

220 sql = self.table().update().where(self.uid_column().__eq__(feature.uid())).values(feature.record.attributes) 

221 conn.execute(sql) 

222 

223 for fld in self.fields: 

224 fld.after_update(feature, mc) 

225 

226 conn.commit() 

227 

228 return feature.uid() 

229 

230 def delete_feature(self, feature, mc): 

231 if not mc.user.can_delete(self): 

232 raise gws.ForbiddenError(f'model {self.uid!r} can_delete=False') 

233 

234 with self.db.connect() as conn: 

235 for fld in self.fields: 

236 fld.before_delete(feature, mc) 

237 

238 sql = sa.delete(self.table()).where(self.uid_column().__eq__(feature.uid())) 

239 

240 conn.execute(sql) 

241 

242 for fld in self.fields: 

243 fld.after_delete(feature, mc) 

244 

245 conn.commit() 

246 

247 return feature.uid()