Coverage for gws-app / gws / plugin / qfieldcloud / caps.py: 0%
273 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1from typing import Optional, cast
3import gws
4import gws.base.shape
5import gws.gis.source
6import gws.lib.datetimex as dtx
7import gws.lib.jsonx
8import gws.lib.osx
9import gws.lib.crs
10import gws.plugin.qgis.caps
12from . import core
15class ProjectProps(gws.Data):
16 """Custom project properties as defined by QField."""
18 areaOfInterest: str
19 areaOfInterestCrs: str
20 baseMapLayer: str
21 baseMapTheme: str
22 baseMapTileSize: int
23 baseMapTilesMaxZoomLevel: int
24 baseMapTilesMinZoomLevel: int
25 baseMapType: str
26 createBaseMap: bool
27 digitizingLogsLayer: str
28 forceAutoPush: bool
29 forceAutoPushIntervalMins: int
30 forceStamping: bool
31 geofencingBehavior: int
32 geofencingIsActive: bool
33 geofencingLayer: str
34 geofencingShouldPreventDigitizing: bool
35 mapThemesActiveLayers: dict
36 maximumImageWidthHeight: int
37 offlineCopyOnlyAoi: bool
38 stampingDetailsTemplate: str
39 stampingFontStyle: str
40 stampingHorizontalAlignment: int
41 stampingImageDecoration: str
43 attachmentDirs: list[str]
44 dataDirs: list[str]
45 dirsToCopy: dict
48class LayerProps(gws.Data):
49 """Custom layer properties as defined by QField."""
51 action: str
52 attachment_naming: dict
53 attribute_editing_locked_expression: str
54 cloud_action: str
55 feature_addition_locked_expression: str
56 feature_deletion_locked_expression: str
57 geometry_editing_locked_expression: str
58 is_attribute_editing_locked: bool
59 is_feature_addition_locked: bool
60 is_feature_deletion_locked: bool
61 is_geometry_editing_locked: bool
62 photo_naming: dict
63 relationship_maximum_visible: dict
64 tracking_distance_requirement_minimum_meters: int
65 tracking_erroneous_distance_safeguard_maximum_meters: int
66 tracking_measurement_type: int
67 tracking_time_requirement_interval_seconds: int
68 value_map_button_interface_threshold: int
71class LayerAction(gws.Enum):
72 remove = 'remove'
73 edit = 'edit'
74 baseMap = 'baseMap'
77class ModelEntry(gws.Data):
78 gpName: str
79 tableName: str
80 model: gws.DatabaseModel
83class LayerEntry(gws.Data):
84 action: LayerAction
85 qgisId: str
86 modelEntry: ModelEntry
87 readOnly: bool
88 sqlFilter: str
89 dataSourceFileName: str
90 dataSource: str
91 dataProvider: str
92 sourceLayer: gws.SourceLayer
93 props: LayerProps
96class Caps(gws.Data):
97 """QField related capabilities extracted from the QGIS project and GWS config."""
99 sourceHash: str
100 qgisPath: str
101 layerMap: dict[str, LayerEntry]
102 modelMap: dict[str, ModelEntry]
103 copyDirs: list[str]
104 baseMapLayerIds: list[str]
105 areaOfInterest: Optional[gws.Bounds]
106 copyOnlyAreaOfInterest: bool
107 projectProps: ProjectProps
110class Parser:
111 """Read qf-related capabilities from the qgis project."""
113 project: core.QfcProject
114 caps: Caps
115 qgisCaps: gws.plugin.qgis.caps.Caps
117 def __init__(self, qfc_project: core.QfcProject):
118 self.qfcProject = qfc_project
120 def parse(self) -> Caps:
121 qp = self.qfcProject.qgisProvider.qgis_project()
122 self.qgisCaps = qp.caps()
124 self.caps = Caps(
125 qgisPath='',
126 sourceHash=qp.sourceHash,
127 layerMap={},
128 modelMap={},
129 copyDirs=[],
130 baseMapLayerIds=[],
131 areaOfInterest=None,
132 copyOnlyAreaOfInterest=False,
133 projectProps=self.extract_project_props(),
134 )
136 if self.qfcProject.qgisProvider.store.type == gws.plugin.qgis.project.StoreType.file:
137 self.caps.qgisPath = self.qfcProject.qgisProvider.store.path
139 self.parse_area_of_interest()
140 self.parse_copy_dirs()
141 self.parse_base_map()
143 self.iter_layers()
145 return self.caps
147 ##
149 def parse_area_of_interest(self):
150 aoi = self.caps.projectProps.areaOfInterest
151 if not aoi:
152 return
153 crs = self.caps.projectProps.areaOfInterestCrs
154 shape = gws.base.shape.from_wkt(aoi, gws.lib.crs.get(crs) or self.qgisCaps.projectCrs)
155 self.caps.areaOfInterest = shape.bounds()
156 self.caps.copyOnlyAreaOfInterest = self.caps.projectProps.offlineCopyOnlyAoi is True
158 def parse_copy_dirs(self):
159 raw_dirs = []
161 # dirsToCopy is a dict (dirname: bool)
162 dc = self.caps.projectProps.dirsToCopy or {}
163 for k, v in dc.items():
164 if v:
165 raw_dirs.append(k)
167 # attachmentDirs and dataDirs are lists
168 raw_dirs.extend(self.caps.projectProps.attachmentDirs or [])
169 raw_dirs.extend(self.caps.projectProps.dataDirs or [])
171 abs_dirs = []
173 for p in raw_dirs:
174 if not p.startswith('/'):
175 if not self.caps.qgisPath:
176 gws.log.warning(f'cannot determine an absolute path for {p!r}')
177 continue
178 p = gws.lib.osx.abs_path(p, self.caps.qgisPath)
179 abs_dirs.append(p)
181 unnest_dirs = []
183 for p in sorted(abs_dirs):
184 if any(p.startswith(d) for d in unnest_dirs):
185 continue
186 unnest_dirs.append(p)
188 self.caps.copyDirs = unnest_dirs
190 def parse_base_map(self):
191 if not self.caps.projectProps.createBaseMap:
192 return
194 bt = self.caps.projectProps.baseMapType
196 if bt == 'mapTheme':
197 theme = self.caps.projectProps.baseMapTheme
198 if not theme:
199 gws.log.warning(f'map theme not defined')
200 return
202 vp = self.qgisCaps.visibilityPresets.get(theme)
203 if not vp:
204 gws.log.warning(f'map theme {theme!r} not found')
205 return
206 self.caps.baseMapLayerIds = vp
207 return
209 if bt == 'singleLayer':
210 uid = self.caps.projectProps.baseMapLayer
211 if uid:
212 self.caps.baseMapLayerIds = [uid]
214 ##
216 def iter_layers(self):
217 for sl in gws.gis.source.filter_layers(self.qgisCaps.sourceLayers, is_group=False):
218 le = self.layer_entry(sl)
219 if le:
220 self.caps.layerMap[le.qgisId] = le
222 def layer_entry(self, sl: gws.SourceLayer) -> Optional[LayerEntry]:
223 le = self.layer_entry_2(sl)
224 if not le:
225 return
226 le.qgisId = sl.sourceId
227 le.sourceLayer = sl
229 return le
231 def layer_entry_2(self, sl: gws.SourceLayer) -> Optional[LayerEntry]:
232 props = self.extract_layer_props(sl)
234 if sl.sourceId in self.caps.baseMapLayerIds:
235 return LayerEntry(action=LayerAction.baseMap, props=props)
237 # 'offline', 'no_action' or 'remove'
238 # for "cable", use "action" instead of "cloud_action"
239 act = props.cloud_action
241 if act == 'remove':
242 return LayerEntry(action=LayerAction.remove, props=props)
244 if act == 'offline':
245 prov = sl.dataSource.get('provider')
246 if prov == 'postgres':
247 return self.postgres_layer_entry(sl, props)
248 # @TODO support offline for other providers?
249 gws.log.warning(f'layer {sl.sourceId!r}: offline editing of {prov!r} not supported')
250 return LayerEntry(action=LayerAction.remove, props=props)
252 def postgres_layer_entry(self, sl, props: LayerProps) -> LayerEntry:
253 read_only = (
254 props.is_attribute_editing_locked
255 and props.is_geometry_editing_locked
256 and props.is_feature_addition_locked
257 and props.is_feature_deletion_locked
258 )
260 table_name = sl.dataSource.get('table')
261 if not table_name or table_name.startswith('(') or table_name.upper().startswith('SELECT '):
262 gws.log.warning(f'layer {sl.sourceId!r}: no table name')
263 return LayerEntry(action=LayerAction.remove, props=props)
265 return LayerEntry(
266 action=LayerAction.edit,
267 readOnly=read_only,
268 sqlFilter=sl.dataSource.get('sql', ''),
269 props=props,
270 )
272 ##
274 def extract_project_props(self) -> ProjectProps:
275 d = {}
276 # there are two of them, QFieldSync and libqfieldsync
277 d.update(self.qgisCaps.properties.get('qfieldsync', {}))
278 d.update(self.qgisCaps.properties.get('QFieldSync', {}))
280 t = ProjectProps()
281 _dict_to_data(d, t)
282 return t
284 def extract_layer_props(self, sl: gws.SourceLayer) -> LayerProps:
285 d = {}
287 for k, v in sl.properties.items():
288 if k.startswith('QFieldSync/'):
289 d[k.split('/').pop()] = v
291 t = LayerProps()
292 _dict_to_data(d, t)
293 return t
295 ##
297 def assign_path_props(self):
298 for le in self.caps.layerMap.values():
299 if le.action == LayerAction.edit:
300 name = le.modelEntry.gpName
301 le.dataSourceFileName = f'{name}.gpkg'
302 le.dataSource = f'./{le.dataSourceFileName}|layername={name}'
303 if le.sqlFilter:
304 le.dataSource += f'|subset={le.sqlFilter}'
305 le.dataProvider = 'ogr'
307 if le.action == LayerAction.baseMap:
308 u = gws.u.to_uid(le.qgisId)
309 le.dataSourceFileName = f'{u}.gpkg'
310 le.dataSource = f'./{le.dataSourceFileName}'
311 le.dataProvider = 'gdal'
313 def create_models(self):
314 self.caps.modelMap = {}
316 for le in self.caps.layerMap.values():
317 self.create_model_entry_for_layer(le)
319 def create_model_entry_for_layer(self, le: LayerEntry):
320 if le.action != LayerAction.edit:
321 return
322 me = self.model_entry_for_source_layer(le.sourceLayer)
323 if not me:
324 gws.log.warning(f'layer {le.qgisId!r}: no model')
325 le.action = LayerAction.remove
326 return
327 if not le.readOnly and not me.model.isEditable:
328 gws.log.warning(f'layer {le.qgisId!r}: table {me.tableName!r} is not editable')
329 le.action = LayerAction.remove
330 return
331 le.modelEntry = me
333 def model_entry_for_source_layer(self, sl: gws.SourceLayer) -> Optional[ModelEntry]:
334 table_name = sl.dataSource.get('table')
335 if not table_name:
336 return
338 for model in self.qfcProject.models:
339 full_name = model.db.join_table_name('', model.tableName)
340 if full_name == model.db.join_table_name('', table_name):
341 gp_name = self.gp_name_for_model(full_name)
342 if gp_name not in self.caps.modelMap:
343 self.caps.modelMap[gp_name] = ModelEntry(gpName=gp_name, tableName=full_name, model=model)
344 return self.caps.modelMap[gp_name]
346 db = self.qfcProject.qgisProvider.postgres_provider_from_datasource(sl.dataSource)
347 if not db.has_table(table_name):
348 gws.log.warning(f'layer {sl.sourceId!r}: table {table_name!r} not found')
349 return
351 gp_name = self.gp_name_for_model(table_name)
353 if gp_name not in self.caps.modelMap:
354 model = self.qfcProject.root.create_shared(
355 gws.ext.object.model,
356 gws.Config(
357 uid=f'qfield_model_{table_name}',
358 type='postgres',
359 # NB: permissions are checked in the public export/import functions above
360 permissions=gws.Config(read=gws.c.PUBLIC, edit=gws.c.PUBLIC),
361 tableName=table_name,
362 isEditable=True,
363 _defaultDb=db,
364 ),
365 )
366 self.caps.modelMap[gp_name] = ModelEntry(gpName=gp_name, tableName=table_name, model=cast(gws.DatabaseModel, model))
368 return self.caps.modelMap[gp_name]
370 def gp_name_for_model(self, table_name):
371 if '.' not in table_name:
372 table_name = 'public.' + table_name
373 return 'qm_' + table_name.replace('.', '_').lower()
376##
379def _dict_to_data(d: dict, t: gws.Data):
380 for k, typ in t.__class__.__annotations__.items():
381 v = d.get(k)
382 if v is None:
383 continue
384 try:
385 if typ is bool:
386 v = str(v) == '1'
387 elif typ is int:
388 v = int(v)
389 elif typ is float:
390 v = float(v)
391 elif typ is dict:
392 v = gws.lib.jsonx.from_string(v)
393 except Exception as exc:
394 gws.log.warning(f'invalid property value {k!r}={v!r}: {exc}')
395 continue
396 setattr(t, k, v)
397 return t