Coverage for gws-app/gws/plugin/ows_client/wfs/provider.py: 0%

69 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1"""WFS provider. 

2 

3References: 

4 

5- wfs 1.0.0: http://portal.opengeospatial.org/files/?artifact_id=7176 Sec 13.7.3 

6- wfs 1.1.0: http://portal.opengeospatial.org/files/?artifact_id=8339 Sec 14.7.3 

7- wfs 2.0.0: http://docs.opengeospatial.org/is/09-025r2/09-025r2.html Sec 11.1.3 

8 

9See also: 

10 

11- https://docs.geoserver.org/latest/en/user/services/wfs/reference.html 

12 

13""" 

14 

15from typing import Optional, cast 

16 

17import gws 

18import gws.base.ows.client 

19import gws.base.shape 

20import gws.config.util 

21import gws.lib.bounds 

22import gws.lib.crs 

23import gws.lib.extent 

24import gws.gis.source 

25 

26from . import caps 

27 

28 

29class Config(gws.base.ows.client.provider.Config): 

30 """WFS provider configuration.""" 

31 

32 withBboxCrs: Optional[bool] 

33 """Add CRS to bbox request parameters.""" 

34 

35 

36class Object(gws.base.ows.client.provider.Object): 

37 protocol = gws.OwsProtocol.WFS 

38 withBboxCrs: bool 

39 isWfs2: bool 

40 

41 def configure(self): 

42 cc = caps.parse(self.get_capabilities()) 

43 

44 self.metadata = cc.metadata 

45 self.sourceLayers = cc.sourceLayers 

46 self.version = cc.version 

47 self.isWfs2 = self.version >= '2' 

48 

49 self.configure_operations(cc.operations) 

50 

51 # use bbox with crs for wfs 2 by default 

52 # see also comments in qgis/qgswfsfeatureiterator.cpp buildURL 

53 p = self.cfg('withBboxCrs') 

54 self.withBboxCrs = self.isWfs2 if p is None else p 

55 

56 DEFAULT_GET_FEATURE_LIMIT = 100 

57 

58 def get_features(self, search, source_layers): 

59 """Perform the WFS GetFeature operation. 

60 

61 We only do spatial searches here. 

62 If no bounds and no shapes are given, return all features. 

63 If a shape is given, find features within its bounds first, 

64 and filter features on our side. 

65 This is more performant than WFS spatial ops (at least for qgis), 

66 and also works without spatial ops support on the provider side. 

67 """ 

68 

69 bounds = search.bounds 

70 search_shape = None 

71 

72 if search.shape: 

73 geometry_tolerance = 0.0 

74 

75 if search.tolerance: 

76 n, u = search.tolerance 

77 geometry_tolerance = n * (search.resolution or 1) if u == 'px' else n 

78 

79 search_shape = search.shape.tolerance_polygon(geometry_tolerance) 

80 bounds = search_shape.bounds() 

81 

82 request_crs = self.forceCrs or gws.lib.crs.WGS84 

83 

84 bbox = gws.lib.bounds.transform(bounds, request_crs).extent 

85 if request_crs.isYX and not self.alwaysXY: 

86 bbox = gws.lib.extent.swap_xy(bbox) 

87 bbox = ','.join(str(k) for k in bbox) 

88 

89 srs = request_crs.urn 

90 if self.withBboxCrs: 

91 bbox += ',' + srs 

92 

93 params = { 

94 'BBOX': bbox, 

95 'COUNT' if self.isWfs2 else 'MAXFEATURES': search.limit or self.DEFAULT_GET_FEATURE_LIMIT, 

96 'SRSNAME': srs, 

97 'TYPENAMES' if self.isWfs2 else 'TYPENAME': [sl.name for sl in source_layers], 

98 'VERSION': self.version, 

99 } 

100 

101 if search.extraParams: 

102 params = gws.u.merge(params, gws.u.to_upper_dict(search.extraParams)) 

103 

104 op = self.get_operation(gws.OwsVerb.GetFeature) 

105 if not op: 

106 return [] 

107 

108 if op.preferredFormat: 

109 params.setdefault('OUTPUTFORMAT', op.preferredFormat) 

110 

111 args = self.prepare_operation(op, params=params) 

112 text = gws.base.ows.client.request.get_text(args) 

113 

114 try: 

115 records = gws.base.ows.client.featureinfo.parse(text, default_crs=request_crs, always_xy=self.alwaysXY) 

116 except gws.Error as exc: 

117 gws.log.error(f'get_features: parse error: {exc!r}') 

118 return [] 

119 

120 gws.log.debug(f'get_features: FOUND={len(records)} params={params!r}') 

121 

122 for rec in records: 

123 if rec.shape: 

124 rec.shape = rec.shape.transformed_to(bounds.crs) 

125 

126 if not search_shape: 

127 return records 

128 

129 filtered = [ 

130 rec for rec in records 

131 if not rec.shape or rec.shape.intersects(search_shape) 

132 ] 

133 

134 gws.log.debug(f'get_features: FILTERED={len(filtered)}') 

135 return filtered