Coverage for gws-app/gws/base/ows/client/featureinfo.py: 98%

124 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 23:09 +0200

1"""Parse WMS/WFS FeatureInfo responses.""" 

2 

3import gws 

4import gws.base.shape 

5import gws.lib.gml 

6import gws.lib.xmlx as xmlx 

7 

8 

9class Error(gws.Error): 

10 pass 

11 

12 

13def parse(text: str, default_crs: gws.Crs = None, always_xy=False) -> list[gws.FeatureRecord]: 

14 gws.debug.time_start('featureinfo:parse') 

15 recs = _parse(text.strip(), default_crs, always_xy) 

16 gws.debug.time_end() 

17 return recs 

18 

19 

20def _parse(text, default_crs, always_xy): 

21 if not text.strip(): 

22 return [] 

23 

24 if text.startswith('<'): 

25 try: 

26 xml_el = xmlx.from_string(text, gws.XmlOptions(removeNamespaces=True)) 

27 except xmlx.Error as exc: 

28 raise Error(f'XML error') from exc 

29 

30 parser = _XML_FORMATS.get(xml_el.lcName) 

31 if not parser: 

32 raise Error(f'XML format error for {xml_el.name!r}') 

33 

34 recs = parser(xml_el, default_crs, always_xy) 

35 gws.log.debug(f'parsed with {parser.__name__} count={len(recs)}') 

36 return recs 

37 

38 raise Error(f'unknown format in {text[:100]!r}') 

39 

40 

41## 

42 

43 

44def _parse_msgmloutput(xml_el: gws.XmlElement, default_crs, always_xy): 

45 # msGMLOutput (MapServer) 

46 # 

47 # <msGMLOutput 

48 # <LAYER_1> 

49 # <gml:name>LAYER_NAME 

50 # <FEATURE_1> 

51 # <gml:boundedBy> 

52 # ... 

53 # </gml:boundedBy> 

54 # <GEOMETRY> 

55 # <gml:Point... 

56 # </GEOMETRY> 

57 # <attr>....</attr> 

58 # <attr>....</attr> 

59 # 

60 

61 recs = [] 

62 

63 for layer_el in xml_el: 

64 layer_name = layer_el.lcName 

65 for el in layer_el: 

66 if el.lcName == 'name': 

67 layer_name = el.text 

68 else: 

69 rec = _record_from_gml(el, default_crs, always_xy) 

70 rec.meta = {'layerName': layer_name} 

71 recs.append(rec) 

72 

73 return recs 

74 

75 

76def _parse_featurecollection(xml_el: gws.XmlElement, default_crs, always_xy): 

77 # FeatureCollection (OGC) 

78 # 

79 # <FeatureCollection 

80 # <wfs:member> 

81 # <FEATURE gml:id=... 

82 # <attr>....</attr> 

83 # <attr> <nested>....</attr> 

84 # <GEOMETRY> 

85 # <gml:Point... 

86 # 

87 

88 recs = [] 

89 

90 for member_el in xml_el: 

91 if member_el.lcName in {'member', 'featuremember'}: 

92 if len(member_el) == 1 and len(member_el[0]) > 0: 

93 # <wfs:member><my:feature><attr... 

94 recs.append(_record_from_gml(member_el[0], default_crs, always_xy)) 

95 elif len(member_el) > 1: 

96 # <wfs:member><attr... 

97 recs.append(_record_from_gml(member_el, default_crs, always_xy)) 

98 

99 return recs 

100 

101 

102def _parse_getfeatureinforesponse(xml_el: gws.XmlElement, default_crs, always_xy): 

103 # GetFeatureInfoResponse (geoserver/qgis) 

104 # 

105 # <GetFeatureInfoResponse> 

106 # <Layer name="...."> 

107 # <Feature id="..."> 

108 # <Attribute name="..." value="..."/> 

109 # <Attribute name="geometry" value="<wkt>"/> 

110 # 

111 # For qgis raster layers, "Attribute" is directly under "Layer": 

112 # 

113 # <GetFeatureInfoResponse> 

114 # <Layer name="...."> 

115 # <Attribute name="..." value="..."/> 

116 

117 def attr(rec, el): 

118 key = el.get('name').lower() 

119 val = el.get('value', '').strip() 

120 if key == 'geometry': 

121 rec.shape = gws.base.shape.from_wkt(val, default_crs) 

122 elif len(val) > 0: 

123 rec.attributes[key] = val 

124 

125 recs = [] 

126 

127 for layer_el in xml_el: 

128 layer_name = layer_el.get('name') 

129 

130 raster_rec = gws.FeatureRecord( 

131 attributes={}, 

132 uid='', 

133 meta={'layerName': layer_name}, 

134 ) 

135 

136 for sub_el in layer_el: 

137 if sub_el.lcName == 'feature': 

138 rec = gws.FeatureRecord( 

139 attributes={}, 

140 uid=_get_uid(sub_el), 

141 meta={'layerName': layer_name}, 

142 ) 

143 for el in sub_el: 

144 if el.lcName == 'attribute': 

145 attr(rec, el) 

146 recs.append(rec) 

147 

148 if sub_el.lcName == 'attribute': 

149 attr(raster_rec, sub_el) 

150 

151 if raster_rec.attributes: 

152 recs.append(raster_rec) 

153 

154 return recs 

155 

156 

157def _parse_featureinforesponse(xml_el: gws.XmlElement, default_crs, always_xy): 

158 # FeatureInfoResponse (Arcgis) 

159 # 

160 # https://webhelp.esri.com/arcims/9.3/General/mergedProjects/wms_connect/wms_connector/get_featureinfo.htm 

161 # 

162 # <FeatureInfoResponse... 

163 # <fields objectid="15111" shape="polygon"... 

164 # <fields objectid="15111" shape="polygon"... 

165 

166 recs = [] 

167 

168 for fields_el in xml_el: 

169 if fields_el.lcName == 'fields': 

170 rec = gws.FeatureRecord( 

171 attributes={}, 

172 uid=_get_uid(fields_el), 

173 ) 

174 for key, val in fields_el.attrib.items(): 

175 key = key.lower() 

176 if key != 'shape': 

177 rec.attributes[key] = val 

178 recs.append(rec) 

179 

180 return recs 

181 

182 

183def _parse_geobak(xml_el: gws.XmlElement, default_crs, always_xy): 

184 # GeoBAK (https://www.egovernment.sachsen.de/geodaten.html) 

185 # 

186 # <geobak_20:Sachdatenabfrage... 

187 # <geobak_20:Kartenebene>.... 

188 # <geobak_20:Inhalt> 

189 # <geobak_20:Datensatz> 

190 # <geobak_20:Attribut> 

191 # <geobak_20:Name>... 

192 # <geobak_20:Wert>... 

193 # <geobak_20:Inhalt> 

194 # <geobak_20:Datensatz> 

195 # ... 

196 

197 recs = [] 

198 

199 for el in xml_el: 

200 rec = gws.FeatureRecord(attributes={}) 

201 

202 if el.lcName == 'kartenebene': 

203 rec.meta = {'layerName': el.text} 

204 continue 

205 

206 if el.lcName == 'inhalt': 

207 for attr_el in el[0]: 

208 key = attr_el[0].text.strip().lower() 

209 val = attr_el[1].text.strip() 

210 if key != 'shape' and val.lower() != 'null': 

211 rec.attributes[key] = val 

212 

213 recs.append(rec) 

214 

215 return recs 

216 

217 

218## 

219 

220_DEEP_ATTRIBUTE_DELIMITER = '.' 

221 

222 

223def _record_from_gml(feature_el, default_crs, always_xy) -> gws.FeatureRecord: 

224 # like GDAL does: 

225 # "When reading a feature, the driver will by default only take into account 

226 # the last recognized GML geometry found..." (https://gdal.org/drivers/vector/gml.html) 

227 

228 rec = gws.FeatureRecord( 

229 attributes={}, 

230 uid=_get_uid(feature_el), 

231 meta={'layerName': feature_el.lcName}, 

232 ) 

233 

234 bbox = None 

235 

236 for el in feature_el: 

237 if el.lcName == 'boundedby': 

238 # <gml:boundedBy directly under feature 

239 bbox = gws.lib.gml.parse_envelope(el[0], default_crs, always_xy) 

240 elif gws.lib.gml.is_geometry_element(el): 

241 # <gml:Polygon etc directly under feature 

242 rec.shape = gws.lib.gml.parse_shape(el, default_crs, always_xy) 

243 elif len(el) == 1 and gws.lib.gml.is_geometry_element(el[0]): 

244 # <gml:Polygon etc in a wrapper tag 

245 rec.shape = gws.lib.gml.parse_shape(el[0], default_crs, always_xy) 

246 elif len(el) > 0: 

247 # sub-feature 

248 sub = _record_from_gml(el, default_crs, always_xy) 

249 for k, v in sub.attributes.items(): 

250 rec.attributes[el.lcName + _DEEP_ATTRIBUTE_DELIMITER + k] = v 

251 else: 

252 # attribute <attr>text</attr> 

253 s = el.text.strip() 

254 if s: 

255 rec.attributes[el.lcName] = s 

256 

257 if not rec.shape and bbox: 

258 rec.shape = gws.base.shape.from_bounds(bbox) 

259 

260 return rec 

261 

262 

263_UIDS = ['id', 'fid', 'objectid', 'ID', 'FID', 'OBJECTID'] 

264 

265 

266def _get_uid(el): 

267 for u in _UIDS: 

268 if u in el.attrib: 

269 return el.get(u) 

270 return '' 

271 

272 

273## 

274 

275 

276_XML_FORMATS = { 

277 'msgmloutput': _parse_msgmloutput, 

278 'featurecollection': _parse_featurecollection, 

279 'getfeatureinforesponse': _parse_getfeatureinforesponse, 

280 'featureinforesponse': _parse_featureinforesponse, 

281 'sachdatenabfrage': _parse_geobak, 

282}