Coverage for gws-app/gws/plugin/model_field/related_multi_feature_list/__init__.py: 94%

94 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1"""Related Multi Feature List field 

2 

3Represents a 1:M relationship betweens a "parent" and multiple "child" tables :: 

4 

5 +---------+ +------------+ 

6 | parent | | child 1 | 

7 +---------+ +------------+ 

8 | key |-------<<| parent_key | 

9 | | +------------+ 

10 | | 

11 | | +------------+ 

12 | | | child 2 | 

13 | | +------------+ 

14 | |-------<<| parent_key | 

15 | | +------------+ 

16 | | 

17 | | +------------+ 

18 | | | child 3 | 

19 | | +------------+ 

20 | |-------<<| parent_key | 

21 +---------+ +------------+ 

22 

23 

24 

25""" 

26 

27import gws 

28import gws.base.model 

29import gws.base.model.related_field as related_field 

30import gws.lib.sa as sa 

31 

32gws.ext.new.modelField('relatedMultiFeatureList') 

33 

34 

35class RelatedItem(gws.Data): 

36 """Configuration for a related model and key.""" 

37 

38 toModel: str 

39 """Related model.""" 

40 toColumn: str 

41 """Key column in the related model.""" 

42 

43 

44class Config(related_field.Config): 

45 """Configuration for related multi feature list field.""" 

46 

47 fromColumn: str = '' 

48 """Key column in this table, primary key by default.""" 

49 related: list[RelatedItem] 

50 """Related models and keys.""" 

51 

52 

53class Props(related_field.Props): 

54 pass 

55 

56 

57class Object(related_field.Object): 

58 attributeType = gws.AttributeType.featurelist 

59 

60 def configure_relationship(self): 

61 self.rel = related_field.Relationship( 

62 src=related_field.RelRef( 

63 model=self.model, 

64 table=self.model.table(), 

65 key=self.column_or_uid(self.model, self.cfg('fromColumn')), 

66 uid=self.model.uid_column(), 

67 ), 

68 tos=[], 

69 ) 

70 

71 for c in self.cfg('related'): 

72 to_mod = self.get_model(c.toModel) 

73 self.rel.tos.append( 

74 related_field.RelRef( 

75 model=to_mod, 

76 table=to_mod.table(), 

77 key=to_mod.column(c.toColumn), 

78 uid=to_mod.uid_column(), 

79 ) 

80 ) 

81 

82 ## 

83 

84 def before_create_related(self, to_feature, mc): 

85 for feature in to_feature.createWithFeatures: 

86 if feature.model == self.model: 

87 key = self.key_for_uid( 

88 self.rel.src.model, 

89 self.rel.src.key, 

90 feature.uid(), 

91 mc, 

92 ) 

93 for to in self.rel.tos: 

94 if to_feature.model == to.model: 

95 to_feature.record.attributes[to.key.name] = key 

96 return 

97 

98 def after_select(self, features, mc): 

99 if not mc.user.can_read(self) or mc.relDepth >= mc.maxDepth: 

100 return 

101 

102 for f in features: 

103 f.set(self.name, []) 

104 

105 uid_to_f = {f.uid(): f for f in features} 

106 

107 for to in self.rel.tos: 

108 sql = ( 

109 sa.select( 

110 to.uid, 

111 self.rel.src.uid, 

112 ) 

113 .select_from( 

114 to.table.join( 

115 self.rel.src.table, 

116 self.rel.src.key.__eq__(to.key), 

117 ), 

118 ) 

119 .where( 

120 self.rel.src.uid.in_(uid_to_f), 

121 ) 

122 ) 

123 

124 r_to_uids = {} 

125 with self.model.db.connect() as conn: 

126 for r, u in conn.execute(sql): 

127 r_to_uids.setdefault(str(r), []).append(str(u)) 

128 

129 for to_feature in to.model.get_features( 

130 r_to_uids, 

131 gws.base.model.secondary_context(mc), 

132 ): 

133 for uid in r_to_uids.get(to_feature.uid(), []): 

134 feature = uid_to_f.get(uid) 

135 feature.get(self.name).append(to_feature) 

136 

137 def after_create(self, feature, mc): 

138 key = self.key_for_uid( 

139 self.model, 

140 self.rel.src.key, 

141 feature.insertedPrimaryKey, 

142 mc, 

143 ) 

144 self.after_write(feature, key, mc) 

145 

146 def after_update(self, feature, mc): 

147 key = self.key_for_uid( 

148 self.model, 

149 self.rel.src.key, 

150 feature.uid(), 

151 mc, 

152 ) 

153 self.after_write(feature, key, mc) 

154 

155 def after_write(self, feature: gws.Feature, key, mc: gws.ModelContext): 

156 if not mc.user.can_write(self) or mc.relDepth >= mc.maxDepth: 

157 return 

158 

159 for to in self.rel.tos: 

160 if not mc.user.can_edit(to.model): 

161 continue 

162 

163 cur_uids = self.to_uids_for_key(to, key, mc) 

164 

165 # fmt: off 

166 new_uids = set( 

167 to_feature.uid() 

168 for to_feature in feature.get(self.name, []) 

169 if to_feature.model == to.model 

170 ) 

171 # fmt: on 

172 

173 ins_uids = new_uids - cur_uids 

174 if ins_uids: 

175 sql = ( 

176 sa.update(to.table) 

177 .values( 

178 {to.key.name: key}, 

179 ) 

180 .where( 

181 to.uid.in_(ins_uids), 

182 ) 

183 ) 

184 with to.model.db.connect() as conn: 

185 conn.execute(sql) 

186 

187 self.drop_links(to, cur_uids - new_uids, mc) 

188 

189 def before_delete(self, feature, mc): 

190 if not mc.user.can_write(self) or mc.relDepth >= mc.maxDepth: 

191 return 

192 

193 key = self.key_for_uid( 

194 self.model, 

195 self.rel.src.key, 

196 feature.uid(), 

197 mc, 

198 ) 

199 setattr(mc, f'_DELETED_KEY_{self.uid}', key) 

200 

201 def after_delete(self, features, mc): 

202 if not mc.user.can_write(self) or mc.relDepth >= mc.maxDepth: 

203 return 

204 

205 key = getattr(mc, f'_DELETED_KEY_{self.uid}') 

206 

207 for to in self.rel.tos: 

208 if not mc.user.can_edit(to.model): 

209 continue 

210 cur_uids = self.to_uids_for_key(to, key, mc) 

211 self.drop_links(to, cur_uids, mc) 

212 

213 def to_uids_for_key(self, to: related_field.RelRef, key, mc): 

214 sql = sa.select(to.uid).where(to.key.__eq__(key)) 

215 with to.model.db.connect() as conn: 

216 return set(str(u[0]) for u in conn.execute(sql)) 

217 

218 def drop_links(self, to: related_field.RelRef, to_uids, mc): 

219 if not to_uids: 

220 return 

221 if self.rel.deleteCascade: 

222 sql = sa.delete(to.table).where(to.uid.in_(to_uids)) 

223 else: 

224 sql = ( 

225 sa.update(to.table) 

226 .values( 

227 {to.key.name: None}, 

228 ) 

229 .where( 

230 to.uid.in_(to_uids), 

231 ) 

232 ) 

233 with to.model.db.connect() as conn: 

234 conn.execute(sql)