Coverage for gws-app/gws/base/database/model.py: 83%
162 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
1"""Database-based models."""
3from typing import Optional
5import gws
6import gws.base.feature
7import gws.base.model
8import gws.config.util
9import gws.lib.sa as sa
12class Config(gws.base.model.Config):
13 """Configuration for the database model."""
15 dbUid: Optional[str]
16 """Database provider uid."""
17 tableName: Optional[str]
18 """Table name for the model."""
19 sqlFilter: Optional[str]
20 """Extra SQL filter."""
23class Props(gws.base.model.Props):
24 pass
27class Object(gws.base.model.Object, gws.DatabaseModel):
28 def configure(self):
29 self.tableName = self.cfg('tableName') or self.cfg('_defaultTableName')
30 if not self.tableName:
31 raise gws.ConfigurationError(f'table name missing in model {self!r}')
33 self.sqlFilter = self.cfg('sqlFilter')
34 self.configure_model()
36 def configure_provider(self):
37 return gws.config.util.configure_database_provider_for(self)
39 ##
41 def describe(self):
42 return self.db.describe(self.tableName)
44 def table(self):
45 return self.db.table(self.tableName)
47 def column(self, column_name):
48 return self.db.column(self.table(), column_name)
50 def uid_column(self):
51 if not self.uidName:
52 raise gws.Error(f'no primary key found for table {self.tableName!r}')
53 if not self.db.has_column(self.table(), self.uidName):
54 raise gws.Error(f'invalid primary key {self.uidName!r} for table {self.tableName!r}')
55 return self.db.column(self.table(), self.uidName)
57 ##
59 def find_features(self, search, mc):
60 if not mc.user.can_read(self):
61 raise gws.ForbiddenError(f'model {self.uid!r} can_read=False')
63 mc = gws.base.model.copy_context(mc)
64 mc.search = search
65 mc.dbSelect = gws.ModelSelectBuild(
66 columns=[],
67 geometryWhere=[],
68 keywordWhere=[],
69 order=[],
70 where=[],
71 )
73 with self.db.connect():
74 for fld in self.fields:
75 fld.before_select(mc)
77 sql = self.build_select(mc)
78 if sql is None:
79 return []
81 features = self.fetch_features(sql)
83 for fld in self.fields:
84 fld.after_select(features, mc)
86 return features
88 def fetch_features(self, select):
89 features = []
91 with self.db.connect() as conn:
92 for row in conn.fetch_all(select):
93 features.append(
94 gws.base.feature.new(
95 model=self,
96 record=gws.FeatureRecord(attributes=row),
97 )
98 )
100 return features
102 def build_select(self, mc):
103 # @TODO sorting should be handled on the field level
104 sorts = mc.search.sort or self.defaultSort or []
105 for s in sorts:
106 fn = sa.desc if s.reverse else sa.asc
107 mc.dbSelect.order.append(fn(self.column(s.fieldName)))
109 sel = sa.select().select_from(self.table())
111 if mc.search.uids:
112 if not self.uidName:
113 gws.log.debug(f'build_select: {self}: no primary key for {self.tableName=}')
114 return
115 sel = sel.where(self.uid_column().in_(mc.search.uids))
117 if mc.search.keyword and not mc.dbSelect.keywordWhere:
118 gws.log.debug(f'build_select: {self}: no keyword where')
119 return
120 if mc.dbSelect.keywordWhere:
121 sel = sel.where(sa.or_(*mc.dbSelect.keywordWhere))
123 if mc.search.shape and not mc.dbSelect.geometryWhere:
124 gws.log.debug(f'build_select: {self}: no geometry where')
125 return
126 if mc.dbSelect.geometryWhere:
127 sel = sel.where(sa.or_(*mc.dbSelect.geometryWhere))
129 sel = sel.where(*mc.dbSelect.where)
130 if mc.search.extraWhere:
131 for w in mc.search.extraWhere:
132 sel = sel.where(w)
134 if self.sqlFilter:
135 sel = sel.where(sa.text('(' + self.sqlFilter + ')'))
137 cols = []
138 for col in mc.dbSelect.columns or []:
139 if any(col is c for c in cols):
140 continue
141 cols.append(col)
142 for col in mc.search.extraColumns or []:
143 if any(col is c for c in cols):
144 continue
145 cols.append(col)
147 sel = sel.add_columns(*cols)
149 if mc.dbSelect.order:
150 sel = sel.order_by(*mc.dbSelect.order)
152 return sel
154 ##
156 def init_feature(self, feature, mc):
157 if not mc.user.can_create(self):
158 raise gws.ForbiddenError(f'model {self.uid!r} can_create=False')
160 for fld in self.fields:
161 fld.do_init(feature, mc)
163 for rf in feature.createWithFeatures:
164 for fld in rf.model.fields:
165 fld.do_init_related(feature, mc)
167 def create_feature(self, feature, mc):
168 if not mc.user.can_create(self):
169 raise gws.ForbiddenError(f'model {self.uid!r} can_create=False')
171 feature.record = gws.FeatureRecord(attributes={}, meta={})
173 related_models = []
174 for from_feature in feature.createWithFeatures:
175 if from_feature.model not in related_models:
176 related_models.append(from_feature.model)
178 with self.db.connect() as conn:
179 for m in related_models:
180 for fld in m.fields:
181 fld.before_create_related(feature, mc)
183 for fld in self.fields:
184 fld.before_create(feature, mc)
186 sql = sa.insert(self.table())
187 rs = conn.execute(sql, feature.record.attributes)
188 pk = rs.inserted_primary_key
189 if not pk:
190 feature.insertedPrimaryKey = None
191 elif len(pk) == 1:
192 feature.insertedPrimaryKey = pk[0]
193 else:
194 raise gws.Error(f'composite primary keys not supported for {self.tableName!r}')
196 for fld in self.fields:
197 fld.after_create(feature, mc)
199 for m in related_models:
200 for fld in m.fields:
201 fld.after_create_related(feature, mc)
203 conn.commit()
205 return feature.insertedPrimaryKey
207 def update_feature(self, feature, mc):
208 if not mc.user.can_write(self):
209 raise gws.ForbiddenError(f'model {self.uid!r} can_write=False')
211 feature.record = gws.FeatureRecord(attributes={}, meta={})
213 with self.db.connect() as conn:
214 for fld in self.fields:
215 fld.before_update(feature, mc)
217 if not feature.record.attributes:
218 return feature.uid()
220 sql = self.table().update().where(self.uid_column().__eq__(feature.uid())).values(feature.record.attributes)
221 conn.execute(sql)
223 for fld in self.fields:
224 fld.after_update(feature, mc)
226 conn.commit()
228 return feature.uid()
230 def delete_feature(self, feature, mc):
231 if not mc.user.can_delete(self):
232 raise gws.ForbiddenError(f'model {self.uid!r} can_delete=False')
234 with self.db.connect() as conn:
235 for fld in self.fields:
236 fld.before_delete(feature, mc)
238 sql = sa.delete(self.table()).where(self.uid_column().__eq__(feature.uid()))
240 conn.execute(sql)
242 for fld in self.fields:
243 fld.after_delete(feature, mc)
245 conn.commit()
247 return feature.uid()