Coverage for gws-app/gws/base/ows/server/service.py: 66%

194 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1"""OWS Service.""" 

2 

3from typing import Optional, cast 

4 

5import gws 

6import gws.base.legend 

7import gws.base.web 

8import gws.config.util 

9import gws.lib.bounds 

10import gws.lib.crs 

11import gws.lib.datetimex 

12import gws.lib.extent 

13import gws.lib.image 

14import gws.base.metadata 

15import gws.lib.mime 

16import gws.lib.xmlx 

17from . import core, error, request 

18 

19 

20class Config(gws.ConfigWithAccess): 

21 """Configuration for OWS services.""" 

22 

23 defaultFeatureCount: int = 1000 

24 """Default number of features per page.""" 

25 extent: Optional[gws.Extent] 

26 """Service extent.""" 

27 extent: Optional[gws.Extent] 

28 """Service extent, in extentCrs or the first supported CRS.""" 

29 extentCrs: Optional[gws.CrsName] 

30 """Service extent CRS. (added in 8.2)""" 

31 imageFormats: Optional[list[gws.lib.image.FormatConfig]] 

32 """Supported image formats.""" 

33 maxFeatureCount: int = 10000 

34 """Max number of features per page.""" 

35 metadata: Optional[gws.base.metadata.Config] 

36 """Service metadata.""" 

37 rootLayerUid: str = '' 

38 """Root layer uid.""" 

39 searchTolerance: gws.UomValueStr = '10px' 

40 """Search pixel tolerance.""" 

41 supportedCrs: Optional[list[gws.CrsName]] 

42 """List of CRS supported by this service.""" 

43 templates: Optional[list[gws.ext.config.template]] 

44 """XML and HTML templates.""" 

45 updateSequence: Optional[str] 

46 """Service update sequence.""" 

47 withInspireMeta: bool = False 

48 """Emit INSPIRE Metadata.""" 

49 withStrictParams: bool = False 

50 """Use strict params checking.""" 

51 

52 

53class Object(gws.OwsService): 

54 """Baseclass for OWS services.""" 

55 

56 def configure(self): 

57 self.project = self.find_closest(gws.ext.object.project) 

58 

59 self.updateSequence = self.cfg('updateSequence') 

60 self.withInspireMeta = self.cfg('withInspireMeta') 

61 self.withStrictParams = self.cfg('withStrictParams') 

62 

63 self.maxFeatureCount = self.cfg('maxFeatureCount') 

64 self.defaultFeatureCount = self.cfg('defaultFeatureCount') 

65 self.searchTolerance = self.cfg('searchTolerance') 

66 

67 self.configure_bounds() 

68 self.configure_image_formats() 

69 self.configure_templates() 

70 self.configure_operations() 

71 self.configure_metadata() 

72 

73 def configure_image_formats(self): 

74 p = self.cfg('imageFormats') 

75 if p: 

76 self.imageFormats = [] 

77 for cfg in p: 

78 self.imageFormats.append( 

79 gws.ImageFormat( 

80 mimeTypes=[s.replace(' ', '') for s in cfg.get('mimeTypes', [])], 

81 options=cfg.get('options') or {}, 

82 ) 

83 ) 

84 return 

85 

86 self.imageFormats = [ 

87 gws.ImageFormat(mimeTypes=[gws.lib.mime.PNG], options={}), 

88 gws.ImageFormat(mimeTypes=[gws.lib.mime.JPEG], options={}), 

89 ] 

90 

91 def configure_bounds(self): 

92 p = self.cfg('supportedCrs') 

93 if p: 

94 crs_list = [gws.lib.crs.require(s) for s in p] 

95 elif self.project: 

96 crs_list = [self.project.map.bounds.crs] 

97 else: 

98 crs_list = [gws.lib.crs.WEBMERCATOR, gws.lib.crs.WGS84] 

99 

100 p = self.cfg('extent') 

101 if p: 

102 q = self.cfg('extentCrs') 

103 crs = gws.lib.crs.require(q) if q else crs_list[0] 

104 bounds = gws.Bounds(crs=crs, extent=gws.lib.extent.from_list(p)) 

105 elif self.project: 

106 bounds = self.project.map.bounds 

107 else: 

108 bounds = gws.Bounds(crs=crs_list[0], extent=crs_list[0].extent) 

109 

110 self.supportedBounds = [gws.lib.bounds.transform(bounds, crs) for crs in crs_list] 

111 return True 

112 

113 def configure_templates(self): 

114 return gws.config.util.configure_templates_for(self) 

115 

116 def configure_operations(self): 

117 pass 

118 

119 def available_formats(self, verb: gws.OwsVerb): 

120 fs = set() 

121 

122 if verb in core.IMAGE_VERBS: 

123 for fmt in self.imageFormats: 

124 fs.update(fmt.mimeTypes) 

125 else: 

126 for tpl in self.templates: 

127 if tpl.subject == f'ows.{verb}': 

128 fs.update(tpl.mimeTypes) 

129 

130 return sorted(fs) 

131 

132 def configure_metadata(self): 

133 self.metadata = gws.base.metadata.from_args( 

134 self.project.metadata if self.project else None, 

135 self.root.app.metadata, 

136 self.cfg('metadata'), 

137 ) 

138 return True 

139 

140 def post_configure(self): 

141 self.post_configure_root_layer() 

142 

143 def post_configure_root_layer(self): 

144 self.rootLayer = None 

145 

146 uid = self.cfg('rootLayerUid') 

147 if not uid: 

148 return 

149 

150 self.rootLayer = cast(gws.Layer, self.root.get(uid, gws.ext.object.layer)) 

151 if not self.rootLayer: 

152 raise gws.ConfigurationError(f'root layer {uid!r} not found') 

153 

154 prj = cast(gws.Project, self.rootLayer.find_closest(gws.ext.object.project)) 

155 if prj and not self.project: 

156 self.project = prj 

157 return 

158 

159 if self.project != prj: 

160 raise gws.ConfigurationError(f'root layer {uid!r} does not belong to {self.project!r}') 

161 

162 ## 

163 

164 def url_path(self, sr: request.Object) -> str: 

165 if sr.project and sr.project != self.project: 

166 return gws.u.action_url_path('owsService', serviceUid=self.uid, projectUid=sr.project.uid) 

167 else: 

168 return gws.u.action_url_path('owsService', serviceUid=self.uid) 

169 

170 ## 

171 

172 def init_request(self, req: gws.WebRequester) -> request.Object: 

173 if req.method == 'GET': 

174 return request.Object(self, req, req.params()) 

175 

176 if req.method == 'POST' and gws.lib.mime.get(req.contentType) == gws.lib.mime.XML: 

177 try: 

178 xml = gws.lib.xmlx.from_string(req.text(), gws.XmlOptions(removeNamespaces=True)) 

179 except gws.lib.xmlx.Error: 

180 raise gws.base.web.error.BadRequest() 

181 

182 is_soap = False 

183 if xml.name == 'Envelope': 

184 is_soap = True 

185 try: 

186 xml = xml.findfirst('Body').findfirst() 

187 except gws.lib.xmlx.Error: 

188 raise gws.base.web.error.BadRequest() 

189 

190 params = self.parse_xml_request(xml) 

191 if not params: 

192 raise gws.base.web.error.BadRequest() 

193 return request.Object(self, req, params, xml_element=xml, is_soap=is_soap) 

194 

195 # @TODO support application/x-www-form-urlencoded 

196 raise gws.base.web.error.BadRequest() 

197 

198 def parse_xml_request(self, xml: gws.XmlElement) -> Optional[dict]: 

199 return {} 

200 

201 def handle_request(self, req: gws.WebRequester) -> gws.ContentResponse: 

202 try: 

203 sr = self.init_request(req) 

204 return self.dispatch_request(sr) 

205 except Exception as exc: 

206 err = error.from_exception(exc) 

207 # @TODO INIMAGE Exceptions 

208 # verb = req.param('REQUEST') 

209 # if verb in core.IMAGE_VERBS: 

210 # return err.to_image_response() 

211 return err.to_xml_response('ows' if self.isOwsCommon else 'ogc') 

212 

213 def dispatch_request(self, sr: request.Object): 

214 fn = getattr(self, sr.operation.handlerName) 

215 return fn(sr) 

216 

217 def get_template(self, sr: request.Object, mime: str = '') -> Optional[gws.Template]: 

218 """Find a template for the given service request.""" 

219 return self.root.app.templateMgr.find_template( 

220 f'ows.{sr.operation.verb}', 

221 where=[self, sr.project], 

222 user=sr.req.user, 

223 mime=mime, 

224 ) 

225 

226 def template_response(self, sr: request.Object, mime: str = '', **kwargs) -> gws.ContentResponse: 

227 """Render a template for the given service request.""" 

228 

229 tpl = self.get_template(sr, mime=mime) 

230 if not tpl: 

231 # OGC 06-042, 7.2.3.1 

232 # If the request specifies a format not supported by the server, the server shall respond with the default text/xml format. 

233 gws.log.debug(f'no template: {sr.operation.verb=} {mime=}') 

234 if mime == gws.lib.mime.XML: 

235 raise error.InvalidFormat() 

236 return self.template_response(sr, gws.lib.mime.XML, **kwargs) 

237 

238 args = request.TemplateArgs( 

239 sr=sr, 

240 service=self, 

241 serviceUrl=sr.req.url_for(self.url_path(sr)), 

242 url_for=sr.req.url_for, 

243 version=sr.version, 

244 intVersion=int(sr.version.replace('.', '')), 

245 **kwargs, 

246 ) 

247 

248 return tpl.render(gws.TemplateRenderInput(args=args)) 

249 

250 def xml_response(self, el: gws.XmlElement, opts: gws.XmlOptions = None) -> gws.ContentResponse: 

251 xml = el.to_string(opts) 

252 return gws.ContentResponse(mime=gws.lib.mime.XML, content=xml) 

253 

254 def image_response(self, sr: request.Object, img: Optional[gws.Image], mime: str) -> gws.ContentResponse: 

255 ifmt = self.find_image_format(mime) 

256 if img: 

257 gws.log.debug(f'image_response: {img.mode()=} {img.size()=} {mime=} {ifmt.options}') 

258 content = img.to_bytes(mime, ifmt.options) if img else gws.lib.image.empty_pixel(mime) 

259 return gws.ContentResponse(mime=mime, content=content) 

260 

261 def find_image_format(self, mime: str) -> gws.ImageFormat: 

262 if not mime: 

263 return self.imageFormats[0] 

264 for f in self.imageFormats: 

265 if mime in f.mimeTypes: 

266 return f 

267 raise error.InvalidFormat() 

268 

269 def render_legend(self, sr: request.Object, lcs: list[core.LayerCaps], mime: str) -> gws.ContentResponse: 

270 uids = [lc.layer.uid for lc in lcs] 

271 cache_key = 'gws.base.ows.server.legend.' + gws.u.sha256(uids) + mime 

272 

273 def _get(): 

274 legend = cast( 

275 gws.Legend, 

276 self.root.create_temporary( 

277 gws.ext.object.legend, 

278 type='combined', 

279 layerUids=uids, 

280 ), 

281 ) 

282 lro = legend.render() 

283 if not lro: 

284 return self.image_response(sr, None, mime) 

285 return self.image_response(sr, gws.base.legend.output_to_image(lro), mime) 

286 

287 return gws.u.get_app_global(cache_key, _get) 

288 

289 def feature_collection(self, sr: request.Object, lcs: list[core.LayerCaps], hits: int, results: list[gws.SearchResult]) -> core.FeatureCollection: 

290 fc = core.FeatureCollection( 

291 members=[], 

292 numMatched=hits, 

293 numReturned=len(results), 

294 timestamp=gws.lib.datetimex.to_iso_string(with_tz=':'), 

295 ) 

296 

297 lcs_map = {id(lc.layer): lc for lc in lcs} 

298 

299 for r in results: 

300 r.feature.transform_to(sr.targetCrs) 

301 fc.members.append( 

302 core.FeatureCollectionMember( 

303 feature=r.feature, 

304 layer=r.layer, 

305 layerCaps=lcs_map.get(id(r.layer)) if r.layer else None, 

306 ) 

307 ) 

308 

309 return fc