Coverage for gws-app/gws/base/ows/server/service.py: 66%
194 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
1"""OWS Service."""
3from typing import Optional, cast
5import gws
6import gws.base.legend
7import gws.base.web
8import gws.config.util
9import gws.lib.bounds
10import gws.lib.crs
11import gws.lib.datetimex
12import gws.lib.extent
13import gws.lib.image
14import gws.base.metadata
15import gws.lib.mime
16import gws.lib.xmlx
17from . import core, error, request
20class Config(gws.ConfigWithAccess):
21 """Configuration for OWS services."""
23 defaultFeatureCount: int = 1000
24 """Default number of features per page."""
25 extent: Optional[gws.Extent]
26 """Service extent."""
27 extent: Optional[gws.Extent]
28 """Service extent, in extentCrs or the first supported CRS."""
29 extentCrs: Optional[gws.CrsName]
30 """Service extent CRS. (added in 8.2)"""
31 imageFormats: Optional[list[gws.lib.image.FormatConfig]]
32 """Supported image formats."""
33 maxFeatureCount: int = 10000
34 """Max number of features per page."""
35 metadata: Optional[gws.base.metadata.Config]
36 """Service metadata."""
37 rootLayerUid: str = ''
38 """Root layer uid."""
39 searchTolerance: gws.UomValueStr = '10px'
40 """Search pixel tolerance."""
41 supportedCrs: Optional[list[gws.CrsName]]
42 """List of CRS supported by this service."""
43 templates: Optional[list[gws.ext.config.template]]
44 """XML and HTML templates."""
45 updateSequence: Optional[str]
46 """Service update sequence."""
47 withInspireMeta: bool = False
48 """Emit INSPIRE Metadata."""
49 withStrictParams: bool = False
50 """Use strict params checking."""
53class Object(gws.OwsService):
54 """Baseclass for OWS services."""
56 def configure(self):
57 self.project = self.find_closest(gws.ext.object.project)
59 self.updateSequence = self.cfg('updateSequence')
60 self.withInspireMeta = self.cfg('withInspireMeta')
61 self.withStrictParams = self.cfg('withStrictParams')
63 self.maxFeatureCount = self.cfg('maxFeatureCount')
64 self.defaultFeatureCount = self.cfg('defaultFeatureCount')
65 self.searchTolerance = self.cfg('searchTolerance')
67 self.configure_bounds()
68 self.configure_image_formats()
69 self.configure_templates()
70 self.configure_operations()
71 self.configure_metadata()
73 def configure_image_formats(self):
74 p = self.cfg('imageFormats')
75 if p:
76 self.imageFormats = []
77 for cfg in p:
78 self.imageFormats.append(
79 gws.ImageFormat(
80 mimeTypes=[s.replace(' ', '') for s in cfg.get('mimeTypes', [])],
81 options=cfg.get('options') or {},
82 )
83 )
84 return
86 self.imageFormats = [
87 gws.ImageFormat(mimeTypes=[gws.lib.mime.PNG], options={}),
88 gws.ImageFormat(mimeTypes=[gws.lib.mime.JPEG], options={}),
89 ]
91 def configure_bounds(self):
92 p = self.cfg('supportedCrs')
93 if p:
94 crs_list = [gws.lib.crs.require(s) for s in p]
95 elif self.project:
96 crs_list = [self.project.map.bounds.crs]
97 else:
98 crs_list = [gws.lib.crs.WEBMERCATOR, gws.lib.crs.WGS84]
100 p = self.cfg('extent')
101 if p:
102 q = self.cfg('extentCrs')
103 crs = gws.lib.crs.require(q) if q else crs_list[0]
104 bounds = gws.Bounds(crs=crs, extent=gws.lib.extent.from_list(p))
105 elif self.project:
106 bounds = self.project.map.bounds
107 else:
108 bounds = gws.Bounds(crs=crs_list[0], extent=crs_list[0].extent)
110 self.supportedBounds = [gws.lib.bounds.transform(bounds, crs) for crs in crs_list]
111 return True
113 def configure_templates(self):
114 return gws.config.util.configure_templates_for(self)
116 def configure_operations(self):
117 pass
119 def available_formats(self, verb: gws.OwsVerb):
120 fs = set()
122 if verb in core.IMAGE_VERBS:
123 for fmt in self.imageFormats:
124 fs.update(fmt.mimeTypes)
125 else:
126 for tpl in self.templates:
127 if tpl.subject == f'ows.{verb}':
128 fs.update(tpl.mimeTypes)
130 return sorted(fs)
132 def configure_metadata(self):
133 self.metadata = gws.base.metadata.from_args(
134 self.project.metadata if self.project else None,
135 self.root.app.metadata,
136 self.cfg('metadata'),
137 )
138 return True
140 def post_configure(self):
141 self.post_configure_root_layer()
143 def post_configure_root_layer(self):
144 self.rootLayer = None
146 uid = self.cfg('rootLayerUid')
147 if not uid:
148 return
150 self.rootLayer = cast(gws.Layer, self.root.get(uid, gws.ext.object.layer))
151 if not self.rootLayer:
152 raise gws.ConfigurationError(f'root layer {uid!r} not found')
154 prj = cast(gws.Project, self.rootLayer.find_closest(gws.ext.object.project))
155 if prj and not self.project:
156 self.project = prj
157 return
159 if self.project != prj:
160 raise gws.ConfigurationError(f'root layer {uid!r} does not belong to {self.project!r}')
162 ##
164 def url_path(self, sr: request.Object) -> str:
165 if sr.project and sr.project != self.project:
166 return gws.u.action_url_path('owsService', serviceUid=self.uid, projectUid=sr.project.uid)
167 else:
168 return gws.u.action_url_path('owsService', serviceUid=self.uid)
170 ##
172 def init_request(self, req: gws.WebRequester) -> request.Object:
173 if req.method == 'GET':
174 return request.Object(self, req, req.params())
176 if req.method == 'POST' and gws.lib.mime.get(req.contentType) == gws.lib.mime.XML:
177 try:
178 xml = gws.lib.xmlx.from_string(req.text(), gws.XmlOptions(removeNamespaces=True))
179 except gws.lib.xmlx.Error:
180 raise gws.base.web.error.BadRequest()
182 is_soap = False
183 if xml.name == 'Envelope':
184 is_soap = True
185 try:
186 xml = xml.findfirst('Body').findfirst()
187 except gws.lib.xmlx.Error:
188 raise gws.base.web.error.BadRequest()
190 params = self.parse_xml_request(xml)
191 if not params:
192 raise gws.base.web.error.BadRequest()
193 return request.Object(self, req, params, xml_element=xml, is_soap=is_soap)
195 # @TODO support application/x-www-form-urlencoded
196 raise gws.base.web.error.BadRequest()
198 def parse_xml_request(self, xml: gws.XmlElement) -> Optional[dict]:
199 return {}
201 def handle_request(self, req: gws.WebRequester) -> gws.ContentResponse:
202 try:
203 sr = self.init_request(req)
204 return self.dispatch_request(sr)
205 except Exception as exc:
206 err = error.from_exception(exc)
207 # @TODO INIMAGE Exceptions
208 # verb = req.param('REQUEST')
209 # if verb in core.IMAGE_VERBS:
210 # return err.to_image_response()
211 return err.to_xml_response('ows' if self.isOwsCommon else 'ogc')
213 def dispatch_request(self, sr: request.Object):
214 fn = getattr(self, sr.operation.handlerName)
215 return fn(sr)
217 def get_template(self, sr: request.Object, mime: str = '') -> Optional[gws.Template]:
218 """Find a template for the given service request."""
219 return self.root.app.templateMgr.find_template(
220 f'ows.{sr.operation.verb}',
221 where=[self, sr.project],
222 user=sr.req.user,
223 mime=mime,
224 )
226 def template_response(self, sr: request.Object, mime: str = '', **kwargs) -> gws.ContentResponse:
227 """Render a template for the given service request."""
229 tpl = self.get_template(sr, mime=mime)
230 if not tpl:
231 # OGC 06-042, 7.2.3.1
232 # If the request specifies a format not supported by the server, the server shall respond with the default text/xml format.
233 gws.log.debug(f'no template: {sr.operation.verb=} {mime=}')
234 if mime == gws.lib.mime.XML:
235 raise error.InvalidFormat()
236 return self.template_response(sr, gws.lib.mime.XML, **kwargs)
238 args = request.TemplateArgs(
239 sr=sr,
240 service=self,
241 serviceUrl=sr.req.url_for(self.url_path(sr)),
242 url_for=sr.req.url_for,
243 version=sr.version,
244 intVersion=int(sr.version.replace('.', '')),
245 **kwargs,
246 )
248 return tpl.render(gws.TemplateRenderInput(args=args))
250 def xml_response(self, el: gws.XmlElement, opts: gws.XmlOptions = None) -> gws.ContentResponse:
251 xml = el.to_string(opts)
252 return gws.ContentResponse(mime=gws.lib.mime.XML, content=xml)
254 def image_response(self, sr: request.Object, img: Optional[gws.Image], mime: str) -> gws.ContentResponse:
255 ifmt = self.find_image_format(mime)
256 if img:
257 gws.log.debug(f'image_response: {img.mode()=} {img.size()=} {mime=} {ifmt.options}')
258 content = img.to_bytes(mime, ifmt.options) if img else gws.lib.image.empty_pixel(mime)
259 return gws.ContentResponse(mime=mime, content=content)
261 def find_image_format(self, mime: str) -> gws.ImageFormat:
262 if not mime:
263 return self.imageFormats[0]
264 for f in self.imageFormats:
265 if mime in f.mimeTypes:
266 return f
267 raise error.InvalidFormat()
269 def render_legend(self, sr: request.Object, lcs: list[core.LayerCaps], mime: str) -> gws.ContentResponse:
270 uids = [lc.layer.uid for lc in lcs]
271 cache_key = 'gws.base.ows.server.legend.' + gws.u.sha256(uids) + mime
273 def _get():
274 legend = cast(
275 gws.Legend,
276 self.root.create_temporary(
277 gws.ext.object.legend,
278 type='combined',
279 layerUids=uids,
280 ),
281 )
282 lro = legend.render()
283 if not lro:
284 return self.image_response(sr, None, mime)
285 return self.image_response(sr, gws.base.legend.output_to_image(lro), mime)
287 return gws.u.get_app_global(cache_key, _get)
289 def feature_collection(self, sr: request.Object, lcs: list[core.LayerCaps], hits: int, results: list[gws.SearchResult]) -> core.FeatureCollection:
290 fc = core.FeatureCollection(
291 members=[],
292 numMatched=hits,
293 numReturned=len(results),
294 timestamp=gws.lib.datetimex.to_iso_string(with_tz=':'),
295 )
297 lcs_map = {id(lc.layer): lc for lc in lcs}
299 for r in results:
300 r.feature.transform_to(sr.targetCrs)
301 fc.members.append(
302 core.FeatureCollectionMember(
303 feature=r.feature,
304 layer=r.layer,
305 layerCaps=lcs_map.get(id(r.layer)) if r.layer else None,
306 )
307 )
309 return fc