Coverage for gws-app/gws/base/ows/server/request.py: 79%

216 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1"""Service Request object.""" 

2 

3from typing import Optional, Callable, cast 

4import re 

5 

6import gws 

7import gws.base.layer.core 

8import gws.base.legend 

9import gws.base.model 

10import gws.base.web 

11import gws.lib.extent 

12import gws.gis.render 

13import gws.lib.mime 

14import gws.lib.bounds 

15import gws.lib.crs 

16import gws.lib.image 

17import gws.lib.uom 

18import gws.lib.xmlx 

19 

20from . import core, layer_caps, error 

21 

22 

23class TemplateArgs(gws.TemplateArgs): 

24 """Arguments for service templates.""" 

25 

26 featureCollection: core.FeatureCollection 

27 metadataCollection: core.MetadataCollection 

28 operation: gws.OwsOperation 

29 project: gws.Project 

30 request: 'Object' 

31 layerCapsList: list[core.LayerCaps] 

32 sr: 'Object' 

33 service: gws.OwsService 

34 serviceUrl: str 

35 url_for: Callable 

36 gmlVersion: int 

37 version: str 

38 intVersion: int 

39 tileMatrixSets: list[gws.TileMatrixSet] 

40 

41 

42class Object: 

43 alwaysXY: bool 

44 bounds: gws.Bounds 

45 crs: gws.Crs 

46 params: dict 

47 pxSize: gws.Size 

48 resolution: float 

49 resX: float 

50 resY: float 

51 isSoap: bool = False 

52 layerCapsList: list[core.LayerCaps] 

53 operation: gws.OwsOperation 

54 project: gws.Project 

55 req: gws.WebRequester 

56 service: gws.OwsService 

57 targetCrs: gws.Crs 

58 version: str 

59 xmlElement: Optional[gws.XmlElement] 

60 customXmlns: dict 

61 

62 def __init__( 

63 self, 

64 service: gws.OwsService, 

65 req: gws.WebRequester, 

66 params: dict, 

67 xml_element: gws.XmlElement = None, 

68 is_soap=False, 

69 ) -> None: 

70 self.service = service 

71 self.req = req 

72 self.project = cast(gws.Project, None) 

73 self.params = gws.u.to_upper_dict(params) 

74 self.xmlElement = xml_element 

75 self.isSoap = is_soap 

76 

77 self.operation = self.requested_operation('REQUEST') 

78 self.version = self.requested_version('VERSION,ACCEPTVERSIONS') 

79 

80 self.alwaysXY = False 

81 self.isSoap = False 

82 self.pxSize = 0, 0 

83 self.resolution = 0 

84 self.resX = 0 

85 self.resY = 0 

86 

87 # OGC 06-042, 7.2.3.5 

88 if self.service.updateSequence: 

89 s = self.string_param('UPDATESEQUENCE', default='') 

90 if s and s == self.service.updateSequence: 

91 raise error.CurrentUpdateSequence() 

92 if s and s > self.service.updateSequence: 

93 raise error.InvalidUpdateSequence() 

94 

95 self.customXmlns = self.requested_xmlns_replacements() 

96 

97 def require_project(self): 

98 return self.load_project(required=True) 

99 

100 def load_project(self, required=False): 

101 # services can be configured globally (in which case, service.project == None) 

102 # and applied to multiple projects with the projectUid param 

103 # or, configured just for a single project (service.project != None) 

104 

105 p = self.req.param('projectUid') 

106 project = None 

107 

108 if p: 

109 project = self.req.user.require_project(p) 

110 if self.service.project and project != self.service.project: 

111 raise gws.NotFoundError(f'ows {self.service.uid}: wrong project={p!r}') 

112 elif self.service.project: 

113 # for in-project services, ensure the user can access the project 

114 project = self.req.user.require_project(self.service.project.uid) 

115 

116 if not project: 

117 if required: 

118 raise gws.NotFoundError(f'ows {self.service.uid}: project not found') 

119 return 

120 

121 self.project = project 

122 cache_key = 'layer_caps_' + gws.u.sha256([self.service.uid, self.project.uid, sorted(self.req.user.roles)]) 

123 self.layerCapsList = gws.u.get_app_global(cache_key, self.enum_layer_caps) 

124 

125 def enum_layer_caps(self): 

126 lcs = [] 

127 root_layer = self.service.rootLayer or self.project.map.rootLayer 

128 self._enum_layer_caps(root_layer, lcs, []) 

129 return lcs 

130 

131 def _enum_layer_caps(self, layer: gws.Layer, lcs: list[core.LayerCaps], stack: list[core.LayerCaps]): 

132 if not self.req.user.can_read(layer) or not layer.isEnabledForOws: 

133 return 

134 

135 ows = layer.ows 

136 if ows and ows.allowedServiceUids and self.service.uid not in ows.allowedServiceUids: 

137 return 

138 if ows and ows.deniedServiceUids and self.service.uid in ows.deniedServiceUids: 

139 return 

140 

141 is_compat = self.service.layer_is_compatible(layer) 

142 if not is_compat and not layer.isGroup: 

143 return 

144 

145 lc = layer_caps.for_layer(layer, self.req.user, self.service) 

146 

147 # NB groups must be inspected even if not 'compatible' 

148 if layer.isGroup: 

149 lc.isGroup = True 

150 n = len(lcs) 

151 for sub_layer in layer.layers: 

152 self._enum_layer_caps(sub_layer, lcs, stack + [lc]) 

153 if not lc.children: 

154 # no empty groups 

155 return 

156 if is_compat: 

157 lc.hasLegend = any(c.hasLegend for c in lc.children) 

158 lc.isSearchable = any(c.isSearchable for c in lc.children) 

159 lcs.insert(n, lc) 

160 else: 

161 lc.isGroup = False 

162 lcs.append(lc) 

163 for sup_lc in stack: 

164 sup_lc.leaves.append(lc) 

165 

166 if stack: 

167 stack[-1].children.append(lc) 

168 

169 ## 

170 

171 def requested_version(self, param_names: str) -> str: 

172 p, val = self._get_param(param_names, '') 

173 if not val: 

174 # the first supported version is the default 

175 return self.service.supportedVersions[0] 

176 

177 for v in gws.u.to_list(val): 

178 for ver in self.service.supportedVersions: 

179 if ver.startswith(v): 

180 return ver 

181 

182 raise error.VersionNegotiationFailed() 

183 

184 _param2verb = { 

185 'createstoredquery': gws.OwsVerb.CreateStoredQuery, 

186 'describecoverage': gws.OwsVerb.DescribeCoverage, 

187 'describefeaturetype': gws.OwsVerb.DescribeFeatureType, 

188 'describelayer': gws.OwsVerb.DescribeLayer, 

189 'describerecord': gws.OwsVerb.DescribeRecord, 

190 'describestoredqueries': gws.OwsVerb.DescribeStoredQueries, 

191 'dropstoredquery': gws.OwsVerb.DropStoredQuery, 

192 'getcapabilities': gws.OwsVerb.GetCapabilities, 

193 'getfeature': gws.OwsVerb.GetFeature, 

194 'getfeatureinfo': gws.OwsVerb.GetFeatureInfo, 

195 'getfeaturewithlock': gws.OwsVerb.GetFeatureWithLock, 

196 'getlegendgraphic': gws.OwsVerb.GetLegendGraphic, 

197 'getmap': gws.OwsVerb.GetMap, 

198 'getprint': gws.OwsVerb.GetPrint, 

199 'getpropertyvalue': gws.OwsVerb.GetPropertyValue, 

200 'getrecordbyid': gws.OwsVerb.GetRecordById, 

201 'getrecords': gws.OwsVerb.GetRecords, 

202 'gettile': gws.OwsVerb.GetTile, 

203 'liststoredqueries': gws.OwsVerb.ListStoredQueries, 

204 'lockfeature': gws.OwsVerb.LockFeature, 

205 'transaction': gws.OwsVerb.Transaction, 

206 } 

207 

208 def requested_operation(self, param_names: str) -> gws.OwsOperation: 

209 _, val = self._get_param(param_names, '') 

210 op = self.find_operation(val) 

211 if op: 

212 return op 

213 raise error.OperationNotSupported(val) 

214 

215 def find_operation(self, param: str) -> Optional[gws.OwsOperation]: 

216 verb = self._param2verb.get(param.lower()) 

217 if not verb: 

218 return 

219 

220 for op in self.service.supportedOperations: 

221 if op.verb == verb: 

222 return op 

223 

224 def requested_crs(self, param_names: str) -> Optional[gws.Crs]: 

225 _, val = self._get_param(param_names, '') 

226 if not val: 

227 return 

228 

229 crs = gws.lib.crs.get(val) 

230 if not crs: 

231 raise error.InvalidCRS() 

232 

233 for b in self.service.supportedBounds: 

234 if crs == b.crs: 

235 return crs 

236 

237 raise error.InvalidCRS() 

238 

239 def requested_bounds(self, param_names: str) -> Optional[gws.Bounds]: 

240 # OGC 06-042, 7.2.3.5 

241 # OGC 00-028, 6.2.8.2.3 

242 

243 p, val = self._get_param(param_names, '') 

244 if not val: 

245 return 

246 

247 bounds = gws.lib.bounds.from_request_bbox(val, default_crs=self.crs, always_xy=self.alwaysXY) 

248 if bounds: 

249 return gws.lib.bounds.transform(bounds, self.crs) 

250 

251 raise error.InvalidParameterValue(p) 

252 

253 def requested_format(self, param_names: str) -> str: 

254 _, val = self._get_param(param_names, '') 

255 if val: 

256 # NB our mime types do not contain spaces 

257 return ''.join(val.split()) 

258 return '' 

259 

260 def requested_feature_count(self, param_names: str) -> int: 

261 s = self.int_param(param_names, default=0) 

262 if s <= 0: 

263 return self.service.defaultFeatureCount 

264 return min(self.service.maxFeatureCount, s) 

265 

266 def requested_xmlns_replacements(self): 

267 s = self.string_param('NAMESPACES', default='') 

268 if not s: 

269 return {} 

270 

271 # OGC 09-025r1, Table 7 

272 # xmlns(xml,http://www.w3.org/XML/1998/namespace),xmlns(ns37,https://our-ns),xmlns(wfs ... etc 

273 

274 d = {} 

275 

276 for xmlns, uri in re.findall(r'xmlns\((.+?),(.+?)\)', s): 

277 ns = gws.lib.xmlx.namespace.find_by_uri(uri) 

278 if not ns: 

279 gws.log.debug(f'namespace not found: {uri=}') 

280 raise error.InvalidParameterValue('NAMESPACES') 

281 if ns.xmlns == xmlns: 

282 continue 

283 d[ns.uid] = xmlns 

284 

285 return d 

286 

287 ## 

288 

289 def _get_param(self, param_names, default): 

290 names = gws.u.to_list(param_names.upper()) 

291 

292 for p in names: 

293 if p not in self.params: 

294 continue 

295 val = self.params[p] 

296 return p, val 

297 

298 if default is not None: 

299 return '', default 

300 

301 raise error.MissingParameterValue(names[0]) 

302 

303 def string_param(self, param_names: str, values: Optional[set[str]] = None, default: Optional[str] = None) -> str: 

304 p, val = self._get_param(param_names, default) 

305 if values: 

306 val = val.lower() 

307 if val not in values: 

308 raise error.InvalidParameterValue(p) 

309 return val 

310 

311 def list_param(self, param_names: str) -> list[str]: 

312 _, val = self._get_param(param_names, '') 

313 return gws.u.to_list(val) 

314 

315 def int_param(self, param_names: str, default: Optional[int] = None) -> int: 

316 p, val = self._get_param(param_names, default) 

317 try: 

318 return int(val) 

319 except ValueError: 

320 raise error.InvalidParameterValue(p)