Coverage for gws-app/gws/plugin/model_field/related_multi_feature_list/__init__.py: 94%
94 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
1"""Related Multi Feature List field
3Represents a 1:M relationship betweens a "parent" and multiple "child" tables ::
5 +---------+ +------------+
6 | parent | | child 1 |
7 +---------+ +------------+
8 | key |-------<<| parent_key |
9 | | +------------+
10 | |
11 | | +------------+
12 | | | child 2 |
13 | | +------------+
14 | |-------<<| parent_key |
15 | | +------------+
16 | |
17 | | +------------+
18 | | | child 3 |
19 | | +------------+
20 | |-------<<| parent_key |
21 +---------+ +------------+
25"""
27import gws
28import gws.base.model
29import gws.base.model.related_field as related_field
30import gws.lib.sa as sa
32gws.ext.new.modelField('relatedMultiFeatureList')
35class RelatedItem(gws.Data):
36 """Configuration for a related model and key."""
38 toModel: str
39 """Related model."""
40 toColumn: str
41 """Key column in the related model."""
44class Config(related_field.Config):
45 """Configuration for related multi feature list field."""
47 fromColumn: str = ''
48 """Key column in this table, primary key by default."""
49 related: list[RelatedItem]
50 """Related models and keys."""
53class Props(related_field.Props):
54 pass
57class Object(related_field.Object):
58 attributeType = gws.AttributeType.featurelist
60 def configure_relationship(self):
61 self.rel = related_field.Relationship(
62 src=related_field.RelRef(
63 model=self.model,
64 table=self.model.table(),
65 key=self.column_or_uid(self.model, self.cfg('fromColumn')),
66 uid=self.model.uid_column(),
67 ),
68 tos=[],
69 )
71 for c in self.cfg('related'):
72 to_mod = self.get_model(c.toModel)
73 self.rel.tos.append(
74 related_field.RelRef(
75 model=to_mod,
76 table=to_mod.table(),
77 key=to_mod.column(c.toColumn),
78 uid=to_mod.uid_column(),
79 )
80 )
82 ##
84 def before_create_related(self, to_feature, mc):
85 for feature in to_feature.createWithFeatures:
86 if feature.model == self.model:
87 key = self.key_for_uid(
88 self.rel.src.model,
89 self.rel.src.key,
90 feature.uid(),
91 mc,
92 )
93 for to in self.rel.tos:
94 if to_feature.model == to.model:
95 to_feature.record.attributes[to.key.name] = key
96 return
98 def after_select(self, features, mc):
99 if not mc.user.can_read(self) or mc.relDepth >= mc.maxDepth:
100 return
102 for f in features:
103 f.set(self.name, [])
105 uid_to_f = {f.uid(): f for f in features}
107 for to in self.rel.tos:
108 sql = (
109 sa.select(
110 to.uid,
111 self.rel.src.uid,
112 )
113 .select_from(
114 to.table.join(
115 self.rel.src.table,
116 self.rel.src.key.__eq__(to.key),
117 ),
118 )
119 .where(
120 self.rel.src.uid.in_(uid_to_f),
121 )
122 )
124 r_to_uids = {}
125 with self.model.db.connect() as conn:
126 for r, u in conn.execute(sql):
127 r_to_uids.setdefault(str(r), []).append(str(u))
129 for to_feature in to.model.get_features(
130 r_to_uids,
131 gws.base.model.secondary_context(mc),
132 ):
133 for uid in r_to_uids.get(to_feature.uid(), []):
134 feature = uid_to_f.get(uid)
135 feature.get(self.name).append(to_feature)
137 def after_create(self, feature, mc):
138 key = self.key_for_uid(
139 self.model,
140 self.rel.src.key,
141 feature.insertedPrimaryKey,
142 mc,
143 )
144 self.after_write(feature, key, mc)
146 def after_update(self, feature, mc):
147 key = self.key_for_uid(
148 self.model,
149 self.rel.src.key,
150 feature.uid(),
151 mc,
152 )
153 self.after_write(feature, key, mc)
155 def after_write(self, feature: gws.Feature, key, mc: gws.ModelContext):
156 if not mc.user.can_write(self) or mc.relDepth >= mc.maxDepth:
157 return
159 for to in self.rel.tos:
160 if not mc.user.can_edit(to.model):
161 continue
163 cur_uids = self.to_uids_for_key(to, key, mc)
165 # fmt: off
166 new_uids = set(
167 to_feature.uid()
168 for to_feature in feature.get(self.name, [])
169 if to_feature.model == to.model
170 )
171 # fmt: on
173 ins_uids = new_uids - cur_uids
174 if ins_uids:
175 sql = (
176 sa.update(to.table)
177 .values(
178 {to.key.name: key},
179 )
180 .where(
181 to.uid.in_(ins_uids),
182 )
183 )
184 with to.model.db.connect() as conn:
185 conn.execute(sql)
187 self.drop_links(to, cur_uids - new_uids, mc)
189 def before_delete(self, feature, mc):
190 if not mc.user.can_write(self) or mc.relDepth >= mc.maxDepth:
191 return
193 key = self.key_for_uid(
194 self.model,
195 self.rel.src.key,
196 feature.uid(),
197 mc,
198 )
199 setattr(mc, f'_DELETED_KEY_{self.uid}', key)
201 def after_delete(self, features, mc):
202 if not mc.user.can_write(self) or mc.relDepth >= mc.maxDepth:
203 return
205 key = getattr(mc, f'_DELETED_KEY_{self.uid}')
207 for to in self.rel.tos:
208 if not mc.user.can_edit(to.model):
209 continue
210 cur_uids = self.to_uids_for_key(to, key, mc)
211 self.drop_links(to, cur_uids, mc)
213 def to_uids_for_key(self, to: related_field.RelRef, key, mc):
214 sql = sa.select(to.uid).where(to.key.__eq__(key))
215 with to.model.db.connect() as conn:
216 return set(str(u[0]) for u in conn.execute(sql))
218 def drop_links(self, to: related_field.RelRef, to_uids, mc):
219 if not to_uids:
220 return
221 if self.rel.deleteCascade:
222 sql = sa.delete(to.table).where(to.uid.in_(to_uids))
223 else:
224 sql = (
225 sa.update(to.table)
226 .values(
227 {to.key.name: None},
228 )
229 .where(
230 to.uid.in_(to_uids),
231 )
232 )
233 with to.model.db.connect() as conn:
234 conn.execute(sql)