Coverage for gws-app / gws / plugin / qfieldcloud / patcher.py: 0%
150 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1from typing import cast, Optional
3import gws
4import gws.base.shape
5import gws.plugin.model_field.file as file_field
6import gws.base.feature
7import gws.lib.osx
9from . import core, caps as caps_mod
12class ChangeType(gws.Enum):
13 patch = 'patch'
14 create = 'create'
15 delete = 'delete'
18class Change(gws.Data):
19 uid: str
20 type: ChangeType
21 layerUid: str
22 newAtts: dict
23 oldAtts: dict
24 wkt: str
27class Operation(gws.Data):
28 type: gws.ModelOperation
29 feature: gws.Feature
32class Args(gws.Data):
33 qfcProject: core.QfcProject
34 caps: caps_mod.Caps
35 project: gws.Project
36 user: gws.User
37 baseDir: str
38 changes: list[Change]
39 filePath: str
40 fileContent: bytes
43class Object:
44 root: gws.Root
45 qfcProject: core.QfcProject
46 project: gws.Project
47 user: gws.User
48 args: Args
50 caps: caps_mod.Caps
51 ops_by_model: dict[str, list[Operation]]
53 def prepare(self, args: Args):
54 self.args = args
55 self.qfcProject = self.args.qfcProject
56 self.project = self.args.project
57 self.user = self.args.user
58 self.caps = args.caps
60 def apply_changes(self, root: gws.Root, args: Args):
61 self.root = root
62 self.prepare(args)
64 self.ops_by_model = {}
66 for cc in self.args.changes:
67 self.prepare_change(cc)
69 if not self.ops_by_model:
70 return
72 for gpName, ops in self.ops_by_model.items():
73 self.commit_operations_for_model(self.caps.modelMap[gpName], ops)
75 def commit_operations_for_model(self, me: caps_mod.ModelEntry, ops: list[Operation]):
76 with me.model.db.connect() as conn:
77 for op in ops:
78 gws.log.debug(f'{op.type=} {op.feature.attributes=}')
79 mc = gws.ModelContext(op=op.type, user=self.user, project=self.project)
80 if op.type == gws.ModelOperation.create:
81 me.model.create_feature(op.feature, mc)
82 continue
83 if op.type == gws.ModelOperation.update:
84 me.model.update_feature(op.feature, mc)
85 continue
86 if op.type == gws.ModelOperation.delete:
87 me.model.delete_feature(op.feature, mc)
88 conn.commit()
90 def apply_upload(self, root: gws.Root, args: Args):
91 self.root = root
92 self.prepare(args)
93 self.commit_upload(args.filePath, args.fileContent)
95 def commit_upload(self, path: str, content: bytes):
96 for me in self.caps.modelMap.values():
97 if self.commit_upload_for_model(me, path, content):
98 return
99 gws.log.warning(f'commit_upload: feature not found: {path=}')
101 def commit_upload_for_model(self, me: caps_mod.ModelEntry, path: str, content: bytes) -> bool:
102 mc = gws.ModelContext(op=gws.ModelOperation.update, user=self.user, project=self.project)
104 for fld in me.model.fields:
105 if fld.extType != 'file':
106 continue
107 fld = cast(file_field.Object, fld)
108 if fld.nameColumn is None:
109 continue
110 uid = self.find_uid_for_path(me, fld, path, mc)
111 if not uid:
112 continue
113 gws.log.debug(f'commit_upload: found feature: model={me.gpName}: {fld.name=} {uid=} {path=} ')
115 with me.model.db.connect() as conn:
116 sql = me.model.table().update().where(me.model.uid_column().__eq__(uid)).values({fld.contentColumn: content})
117 conn.execute(sql)
118 conn.commit()
120 return True
122 return False
124 def find_uid_for_path(
125 self,
126 me: caps_mod.ModelEntry,
127 ff: file_field.Object,
128 path: str,
129 mc: gws.ModelContext,
130 ) -> Optional[str]:
131 with me.model.db.connect() as conn:
132 sel = me.model.table().select().with_only_columns(me.model.uid_column()).where(ff.nameColumn == path)
133 rec = conn.fetch_first(sel)
134 if rec:
135 return rec[me.model.uidName]
137 def prepare_change(self, cc: Change):
138 le = self.caps.layerMap.get(cc.layerUid)
139 if not le:
140 gws.log.warning(f'layer not found: {cc.layerUid!r}')
141 return
143 me = le.modelEntry
144 pk_name = me.model.uidName
146 atts = dict(cc.newAtts)
148 # see notes in packager.py about fid/fid_gws
149 if 'fid_gws' in atts:
150 atts['fid'] = atts.pop('fid_gws')
152 if cc.wkt:
153 geom = me.model.geometryName
154 if not geom:
155 gws.log.warning(f'geometry field not found: {me.gpName!r}')
156 else:
157 atts[geom] = gws.base.shape.from_wkt(cc.wkt, me.model.geometryCrs)
159 ops = self.ops_by_model.setdefault(me.gpName, [])
161 if cc.type == ChangeType.create:
162 mc = gws.ModelContext(op=gws.ModelOperation.create, user=self.user, project=self.project)
163 pk_field = me.model.field(pk_name)
164 if pk_field and pk_field.isAuto:
165 atts.pop(pk_name, None)
166 feat = me.model.feature_from_props(gws.FeatureProps(attributes=atts, isNew=True), mc)
167 ops.append(Operation(type=gws.ModelOperation.create, feature=feat))
168 return
170 if cc.type == ChangeType.delete:
171 mc = gws.ModelContext(op=gws.ModelOperation.delete, user=self.user, project=self.project)
172 pk = cc.oldAtts.get(pk_name, '')
173 feat = self.get_feature(me, pk)
174 if not feat:
175 gws.log.warning(f'delete: not found: {pk=} {me.gpName=} {le.qgisId=}')
176 return
177 ops.append(Operation(type=gws.ModelOperation.delete, feature=feat))
178 return
180 if cc.type == ChangeType.patch:
181 pk = cc.oldAtts.get(pk_name, '')
182 feat = self.get_feature(me, pk)
183 if not feat:
184 gws.log.warning(f'update: not found: {pk=} {me.gpName=} {le.qgisId=}')
185 return
186 mc = gws.ModelContext(op=gws.ModelOperation.update, user=self.user, project=self.project)
187 atts[pk_name] = pk
188 feat = me.model.feature_from_props(gws.FeatureProps(attributes=atts), mc)
189 ops.append(Operation(type=gws.ModelOperation.update, feature=feat))
190 return
192 def get_feature(self, me: caps_mod.ModelEntry, pk: str) -> gws.Feature | None:
193 mc = gws.ModelContext(op=gws.ModelOperation.read, user=self.user, project=self.project)
194 fs = me.model.get_features([pk], mc)
195 if fs:
196 return fs[0]