Coverage for gws-app/gws/base/ows/server/request.py: 79%
216 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
1"""Service Request object."""
3from typing import Optional, Callable, cast
4import re
6import gws
7import gws.base.layer.core
8import gws.base.legend
9import gws.base.model
10import gws.base.web
11import gws.lib.extent
12import gws.gis.render
13import gws.lib.mime
14import gws.lib.bounds
15import gws.lib.crs
16import gws.lib.image
17import gws.lib.uom
18import gws.lib.xmlx
20from . import core, layer_caps, error
23class TemplateArgs(gws.TemplateArgs):
24 """Arguments for service templates."""
26 featureCollection: core.FeatureCollection
27 metadataCollection: core.MetadataCollection
28 operation: gws.OwsOperation
29 project: gws.Project
30 request: 'Object'
31 layerCapsList: list[core.LayerCaps]
32 sr: 'Object'
33 service: gws.OwsService
34 serviceUrl: str
35 url_for: Callable
36 gmlVersion: int
37 version: str
38 intVersion: int
39 tileMatrixSets: list[gws.TileMatrixSet]
42class Object:
43 alwaysXY: bool
44 bounds: gws.Bounds
45 crs: gws.Crs
46 params: dict
47 pxSize: gws.Size
48 resolution: float
49 resX: float
50 resY: float
51 isSoap: bool = False
52 layerCapsList: list[core.LayerCaps]
53 operation: gws.OwsOperation
54 project: gws.Project
55 req: gws.WebRequester
56 service: gws.OwsService
57 targetCrs: gws.Crs
58 version: str
59 xmlElement: Optional[gws.XmlElement]
60 customXmlns: dict
62 def __init__(
63 self,
64 service: gws.OwsService,
65 req: gws.WebRequester,
66 params: dict,
67 xml_element: gws.XmlElement = None,
68 is_soap=False,
69 ) -> None:
70 self.service = service
71 self.req = req
72 self.project = cast(gws.Project, None)
73 self.params = gws.u.to_upper_dict(params)
74 self.xmlElement = xml_element
75 self.isSoap = is_soap
77 self.operation = self.requested_operation('REQUEST')
78 self.version = self.requested_version('VERSION,ACCEPTVERSIONS')
80 self.alwaysXY = False
81 self.isSoap = False
82 self.pxSize = 0, 0
83 self.resolution = 0
84 self.resX = 0
85 self.resY = 0
87 # OGC 06-042, 7.2.3.5
88 if self.service.updateSequence:
89 s = self.string_param('UPDATESEQUENCE', default='')
90 if s and s == self.service.updateSequence:
91 raise error.CurrentUpdateSequence()
92 if s and s > self.service.updateSequence:
93 raise error.InvalidUpdateSequence()
95 self.customXmlns = self.requested_xmlns_replacements()
97 def require_project(self):
98 return self.load_project(required=True)
100 def load_project(self, required=False):
101 # services can be configured globally (in which case, service.project == None)
102 # and applied to multiple projects with the projectUid param
103 # or, configured just for a single project (service.project != None)
105 p = self.req.param('projectUid')
106 project = None
108 if p:
109 project = self.req.user.require_project(p)
110 if self.service.project and project != self.service.project:
111 raise gws.NotFoundError(f'ows {self.service.uid}: wrong project={p!r}')
112 elif self.service.project:
113 # for in-project services, ensure the user can access the project
114 project = self.req.user.require_project(self.service.project.uid)
116 if not project:
117 if required:
118 raise gws.NotFoundError(f'ows {self.service.uid}: project not found')
119 return
121 self.project = project
122 cache_key = 'layer_caps_' + gws.u.sha256([self.service.uid, self.project.uid, sorted(self.req.user.roles)])
123 self.layerCapsList = gws.u.get_app_global(cache_key, self.enum_layer_caps)
125 def enum_layer_caps(self):
126 lcs = []
127 root_layer = self.service.rootLayer or self.project.map.rootLayer
128 self._enum_layer_caps(root_layer, lcs, [])
129 return lcs
131 def _enum_layer_caps(self, layer: gws.Layer, lcs: list[core.LayerCaps], stack: list[core.LayerCaps]):
132 if not self.req.user.can_read(layer) or not layer.isEnabledForOws:
133 return
135 ows = layer.ows
136 if ows and ows.allowedServiceUids and self.service.uid not in ows.allowedServiceUids:
137 return
138 if ows and ows.deniedServiceUids and self.service.uid in ows.deniedServiceUids:
139 return
141 is_compat = self.service.layer_is_compatible(layer)
142 if not is_compat and not layer.isGroup:
143 return
145 lc = layer_caps.for_layer(layer, self.req.user, self.service)
147 # NB groups must be inspected even if not 'compatible'
148 if layer.isGroup:
149 lc.isGroup = True
150 n = len(lcs)
151 for sub_layer in layer.layers:
152 self._enum_layer_caps(sub_layer, lcs, stack + [lc])
153 if not lc.children:
154 # no empty groups
155 return
156 if is_compat:
157 lc.hasLegend = any(c.hasLegend for c in lc.children)
158 lc.isSearchable = any(c.isSearchable for c in lc.children)
159 lcs.insert(n, lc)
160 else:
161 lc.isGroup = False
162 lcs.append(lc)
163 for sup_lc in stack:
164 sup_lc.leaves.append(lc)
166 if stack:
167 stack[-1].children.append(lc)
169 ##
171 def requested_version(self, param_names: str) -> str:
172 p, val = self._get_param(param_names, '')
173 if not val:
174 # the first supported version is the default
175 return self.service.supportedVersions[0]
177 for v in gws.u.to_list(val):
178 for ver in self.service.supportedVersions:
179 if ver.startswith(v):
180 return ver
182 raise error.VersionNegotiationFailed()
184 _param2verb = {
185 'createstoredquery': gws.OwsVerb.CreateStoredQuery,
186 'describecoverage': gws.OwsVerb.DescribeCoverage,
187 'describefeaturetype': gws.OwsVerb.DescribeFeatureType,
188 'describelayer': gws.OwsVerb.DescribeLayer,
189 'describerecord': gws.OwsVerb.DescribeRecord,
190 'describestoredqueries': gws.OwsVerb.DescribeStoredQueries,
191 'dropstoredquery': gws.OwsVerb.DropStoredQuery,
192 'getcapabilities': gws.OwsVerb.GetCapabilities,
193 'getfeature': gws.OwsVerb.GetFeature,
194 'getfeatureinfo': gws.OwsVerb.GetFeatureInfo,
195 'getfeaturewithlock': gws.OwsVerb.GetFeatureWithLock,
196 'getlegendgraphic': gws.OwsVerb.GetLegendGraphic,
197 'getmap': gws.OwsVerb.GetMap,
198 'getprint': gws.OwsVerb.GetPrint,
199 'getpropertyvalue': gws.OwsVerb.GetPropertyValue,
200 'getrecordbyid': gws.OwsVerb.GetRecordById,
201 'getrecords': gws.OwsVerb.GetRecords,
202 'gettile': gws.OwsVerb.GetTile,
203 'liststoredqueries': gws.OwsVerb.ListStoredQueries,
204 'lockfeature': gws.OwsVerb.LockFeature,
205 'transaction': gws.OwsVerb.Transaction,
206 }
208 def requested_operation(self, param_names: str) -> gws.OwsOperation:
209 _, val = self._get_param(param_names, '')
210 op = self.find_operation(val)
211 if op:
212 return op
213 raise error.OperationNotSupported(val)
215 def find_operation(self, param: str) -> Optional[gws.OwsOperation]:
216 verb = self._param2verb.get(param.lower())
217 if not verb:
218 return
220 for op in self.service.supportedOperations:
221 if op.verb == verb:
222 return op
224 def requested_crs(self, param_names: str) -> Optional[gws.Crs]:
225 _, val = self._get_param(param_names, '')
226 if not val:
227 return
229 crs = gws.lib.crs.get(val)
230 if not crs:
231 raise error.InvalidCRS()
233 for b in self.service.supportedBounds:
234 if crs == b.crs:
235 return crs
237 raise error.InvalidCRS()
239 def requested_bounds(self, param_names: str) -> Optional[gws.Bounds]:
240 # OGC 06-042, 7.2.3.5
241 # OGC 00-028, 6.2.8.2.3
243 p, val = self._get_param(param_names, '')
244 if not val:
245 return
247 bounds = gws.lib.bounds.from_request_bbox(val, default_crs=self.crs, always_xy=self.alwaysXY)
248 if bounds:
249 return gws.lib.bounds.transform(bounds, self.crs)
251 raise error.InvalidParameterValue(p)
253 def requested_format(self, param_names: str) -> str:
254 _, val = self._get_param(param_names, '')
255 if val:
256 # NB our mime types do not contain spaces
257 return ''.join(val.split())
258 return ''
260 def requested_feature_count(self, param_names: str) -> int:
261 s = self.int_param(param_names, default=0)
262 if s <= 0:
263 return self.service.defaultFeatureCount
264 return min(self.service.maxFeatureCount, s)
266 def requested_xmlns_replacements(self):
267 s = self.string_param('NAMESPACES', default='')
268 if not s:
269 return {}
271 # OGC 09-025r1, Table 7
272 # xmlns(xml,http://www.w3.org/XML/1998/namespace),xmlns(ns37,https://our-ns),xmlns(wfs ... etc
274 d = {}
276 for xmlns, uri in re.findall(r'xmlns\((.+?),(.+?)\)', s):
277 ns = gws.lib.xmlx.namespace.find_by_uri(uri)
278 if not ns:
279 gws.log.debug(f'namespace not found: {uri=}')
280 raise error.InvalidParameterValue('NAMESPACES')
281 if ns.xmlns == xmlns:
282 continue
283 d[ns.uid] = xmlns
285 return d
287 ##
289 def _get_param(self, param_names, default):
290 names = gws.u.to_list(param_names.upper())
292 for p in names:
293 if p not in self.params:
294 continue
295 val = self.params[p]
296 return p, val
298 if default is not None:
299 return '', default
301 raise error.MissingParameterValue(names[0])
303 def string_param(self, param_names: str, values: Optional[set[str]] = None, default: Optional[str] = None) -> str:
304 p, val = self._get_param(param_names, default)
305 if values:
306 val = val.lower()
307 if val not in values:
308 raise error.InvalidParameterValue(p)
309 return val
311 def list_param(self, param_names: str) -> list[str]:
312 _, val = self._get_param(param_names, '')
313 return gws.u.to_list(val)
315 def int_param(self, param_names: str, default: Optional[int] = None) -> int:
316 p, val = self._get_param(param_names, default)
317 try:
318 return int(val)
319 except ValueError:
320 raise error.InvalidParameterValue(p)