Coverage for gws-app/gws/base/ows/server/action.py: 51%
53 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
1"""OWS server action."""
3from typing import Optional, cast
5import gws
6import gws.base.action
7import gws.base.web
8import gws.lib.mime
9import gws.lib.xmlx
11from . import core, layer_caps, error
13gws.ext.new.action('ows')
16class GetServiceRequest(gws.Request):
17 serviceUid: str
20class GetSchemaRequest(gws.Request):
21 namespace: str
24class Config(gws.base.action.Config):
25 """OWS server action"""
28class Object(gws.base.action.Object):
29 @gws.ext.command.get('owsService')
30 def get_service(self, req: gws.WebRequester, p: GetServiceRequest) -> gws.ContentResponse:
31 return self._handle_service(req, p)
33 @gws.ext.command.post('owsService')
34 def post_service(self, req: gws.WebRequester, p: GetServiceRequest) -> gws.ContentResponse:
35 return self._handle_service(req, p)
37 def _handle_service(self, req: gws.WebRequester, p: GetServiceRequest) -> gws.ContentResponse:
38 srv = cast(gws.OwsService, self.root.get(p.serviceUid, gws.ext.object.owsService))
39 if not srv:
40 raise gws.NotFoundError(f'{p.serviceUid=} not found')
41 if not req.user.can_use(srv):
42 raise gws.ForbiddenError(f'{p.serviceUid=} forbidden')
43 return srv.handle_request(req)
45 @gws.ext.command.get('owsXml')
46 def get_schema(self, req: gws.WebRequester, p: GetSchemaRequest) -> gws.ContentResponse:
47 try:
48 content = self._make_schema(req, p)
49 except Exception as exc:
50 return error.from_exception(exc).to_xml_response()
51 return gws.ContentResponse(mime=gws.lib.mime.XML, content=content)
53 def _make_schema(self, req, p) -> str:
54 s = p.namespace
55 if s.endswith('.xsd'):
56 s = s[:-4]
57 ns = gws.lib.xmlx.namespace.get(s)
58 if not ns:
59 raise gws.NotFoundError(f'namespace not found: {p.namespace=}')
61 lcs = []
63 for la in self.root.find_all(gws.ext.object.layer):
64 layer = cast(gws.Layer, la)
65 if req.user.can_read(layer) and layer.ows.xmlNamespace and layer.ows.xmlNamespace.xmlns == ns.xmlns:
66 lcs.append(layer_caps.for_layer(layer, req.user))
68 el, opts = layer_caps.xml_schema(lcs, req.user)
69 if not el:
70 raise gws.NotFoundError(f'cannot create schema: {p.namespace=}')
72 opts.withNamespaceDeclarations = True
73 opts.withSchemaLocations = True
74 opts.withXmlDeclaration = True
76 return el.to_string(opts)