Coverage for gws-app / gws / plugin / qfieldcloud / patcher.py: 0%

150 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1from typing import cast, Optional 

2 

3import gws 

4import gws.base.shape 

5import gws.plugin.model_field.file as file_field 

6import gws.base.feature 

7import gws.lib.osx 

8 

9from . import core, caps as caps_mod 

10 

11 

12class ChangeType(gws.Enum): 

13 patch = 'patch' 

14 create = 'create' 

15 delete = 'delete' 

16 

17 

18class Change(gws.Data): 

19 uid: str 

20 type: ChangeType 

21 layerUid: str 

22 newAtts: dict 

23 oldAtts: dict 

24 wkt: str 

25 

26 

27class Operation(gws.Data): 

28 type: gws.ModelOperation 

29 feature: gws.Feature 

30 

31 

32class Args(gws.Data): 

33 qfcProject: core.QfcProject 

34 caps: caps_mod.Caps 

35 project: gws.Project 

36 user: gws.User 

37 baseDir: str 

38 changes: list[Change] 

39 filePath: str 

40 fileContent: bytes 

41 

42 

43class Object: 

44 root: gws.Root 

45 qfcProject: core.QfcProject 

46 project: gws.Project 

47 user: gws.User 

48 args: Args 

49 

50 caps: caps_mod.Caps 

51 ops_by_model: dict[str, list[Operation]] 

52 

53 def prepare(self, args: Args): 

54 self.args = args 

55 self.qfcProject = self.args.qfcProject 

56 self.project = self.args.project 

57 self.user = self.args.user 

58 self.caps = args.caps 

59 

60 def apply_changes(self, root: gws.Root, args: Args): 

61 self.root = root 

62 self.prepare(args) 

63 

64 self.ops_by_model = {} 

65 

66 for cc in self.args.changes: 

67 self.prepare_change(cc) 

68 

69 if not self.ops_by_model: 

70 return 

71 

72 for gpName, ops in self.ops_by_model.items(): 

73 self.commit_operations_for_model(self.caps.modelMap[gpName], ops) 

74 

75 def commit_operations_for_model(self, me: caps_mod.ModelEntry, ops: list[Operation]): 

76 with me.model.db.connect() as conn: 

77 for op in ops: 

78 gws.log.debug(f'{op.type=} {op.feature.attributes=}') 

79 mc = gws.ModelContext(op=op.type, user=self.user, project=self.project) 

80 if op.type == gws.ModelOperation.create: 

81 me.model.create_feature(op.feature, mc) 

82 continue 

83 if op.type == gws.ModelOperation.update: 

84 me.model.update_feature(op.feature, mc) 

85 continue 

86 if op.type == gws.ModelOperation.delete: 

87 me.model.delete_feature(op.feature, mc) 

88 conn.commit() 

89 

90 def apply_upload(self, root: gws.Root, args: Args): 

91 self.root = root 

92 self.prepare(args) 

93 self.commit_upload(args.filePath, args.fileContent) 

94 

95 def commit_upload(self, path: str, content: bytes): 

96 for me in self.caps.modelMap.values(): 

97 if self.commit_upload_for_model(me, path, content): 

98 return 

99 gws.log.warning(f'commit_upload: feature not found: {path=}') 

100 

101 def commit_upload_for_model(self, me: caps_mod.ModelEntry, path: str, content: bytes) -> bool: 

102 mc = gws.ModelContext(op=gws.ModelOperation.update, user=self.user, project=self.project) 

103 

104 for fld in me.model.fields: 

105 if fld.extType != 'file': 

106 continue 

107 fld = cast(file_field.Object, fld) 

108 if fld.nameColumn is None: 

109 continue 

110 uid = self.find_uid_for_path(me, fld, path, mc) 

111 if not uid: 

112 continue 

113 gws.log.debug(f'commit_upload: found feature: model={me.gpName}: {fld.name=} {uid=} {path=} ') 

114 

115 with me.model.db.connect() as conn: 

116 sql = me.model.table().update().where(me.model.uid_column().__eq__(uid)).values({fld.contentColumn: content}) 

117 conn.execute(sql) 

118 conn.commit() 

119 

120 return True 

121 

122 return False 

123 

124 def find_uid_for_path( 

125 self, 

126 me: caps_mod.ModelEntry, 

127 ff: file_field.Object, 

128 path: str, 

129 mc: gws.ModelContext, 

130 ) -> Optional[str]: 

131 with me.model.db.connect() as conn: 

132 sel = me.model.table().select().with_only_columns(me.model.uid_column()).where(ff.nameColumn == path) 

133 rec = conn.fetch_first(sel) 

134 if rec: 

135 return rec[me.model.uidName] 

136 

137 def prepare_change(self, cc: Change): 

138 le = self.caps.layerMap.get(cc.layerUid) 

139 if not le: 

140 gws.log.warning(f'layer not found: {cc.layerUid!r}') 

141 return 

142 

143 me = le.modelEntry 

144 pk_name = me.model.uidName 

145 

146 atts = dict(cc.newAtts) 

147 

148 # see notes in packager.py about fid/fid_gws 

149 if 'fid_gws' in atts: 

150 atts['fid'] = atts.pop('fid_gws') 

151 

152 if cc.wkt: 

153 geom = me.model.geometryName 

154 if not geom: 

155 gws.log.warning(f'geometry field not found: {me.gpName!r}') 

156 else: 

157 atts[geom] = gws.base.shape.from_wkt(cc.wkt, me.model.geometryCrs) 

158 

159 ops = self.ops_by_model.setdefault(me.gpName, []) 

160 

161 if cc.type == ChangeType.create: 

162 mc = gws.ModelContext(op=gws.ModelOperation.create, user=self.user, project=self.project) 

163 pk_field = me.model.field(pk_name) 

164 if pk_field and pk_field.isAuto: 

165 atts.pop(pk_name, None) 

166 feat = me.model.feature_from_props(gws.FeatureProps(attributes=atts, isNew=True), mc) 

167 ops.append(Operation(type=gws.ModelOperation.create, feature=feat)) 

168 return 

169 

170 if cc.type == ChangeType.delete: 

171 mc = gws.ModelContext(op=gws.ModelOperation.delete, user=self.user, project=self.project) 

172 pk = cc.oldAtts.get(pk_name, '') 

173 feat = self.get_feature(me, pk) 

174 if not feat: 

175 gws.log.warning(f'delete: not found: {pk=} {me.gpName=} {le.qgisId=}') 

176 return 

177 ops.append(Operation(type=gws.ModelOperation.delete, feature=feat)) 

178 return 

179 

180 if cc.type == ChangeType.patch: 

181 pk = cc.oldAtts.get(pk_name, '') 

182 feat = self.get_feature(me, pk) 

183 if not feat: 

184 gws.log.warning(f'update: not found: {pk=} {me.gpName=} {le.qgisId=}') 

185 return 

186 mc = gws.ModelContext(op=gws.ModelOperation.update, user=self.user, project=self.project) 

187 atts[pk_name] = pk 

188 feat = me.model.feature_from_props(gws.FeatureProps(attributes=atts), mc) 

189 ops.append(Operation(type=gws.ModelOperation.update, feature=feat)) 

190 return 

191 

192 def get_feature(self, me: caps_mod.ModelEntry, pk: str) -> gws.Feature | None: 

193 mc = gws.ModelContext(op=gws.ModelOperation.read, user=self.user, project=self.project) 

194 fs = me.model.get_features([pk], mc) 

195 if fs: 

196 return fs[0]