Coverage for gws-app / gws / base / web / wsgi.py: 65%
254 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1"""Basic WSGI request/response handling."""
3import gzip
4import io
5import os
6from typing import cast
8import urllib.parse
9import werkzeug.formparser
10import werkzeug.utils
11import werkzeug.wrappers
12import werkzeug.wsgi
14import gws
15import gws.lib.jsonx
16import gws.lib.mime
17import gws.lib.vendor.umsgpack as umsgpack
19from . import error
22class Responder(gws.WebResponder):
23 def __init__(self, **kwargs):
24 if 'wz' in kwargs:
25 self._wz = kwargs['wz']
26 else:
27 self._wz = werkzeug.wrappers.Response(**kwargs)
28 self.status = self._wz.status_code
30 def __repr__(self):
31 return f'<Responder {self._wz}>'
33 def send_response(self, environ, start_response):
34 return self._wz(environ, start_response)
36 def set_cookie(self, key, value, **kwargs):
37 self._wz.set_cookie(key, value, **kwargs)
39 def delete_cookie(self, key, **kwargs):
40 self._wz.delete_cookie(key, **kwargs)
42 def add_header(self, key, value):
43 self._wz.headers.add(key, value)
45 def set_status(self, status):
46 self._wz.status_code = int(status)
47 self.status = self._wz.status_code
49 def set_body(self, body):
50 if isinstance(body, str):
51 body = body.encode('utf-8')
52 self._wz.set_data(body)
55class Requester(gws.WebRequester):
56 _STRUCT_JSON = 'json'
57 _STRUCT_MSGPACK = 'msgpack'
59 _struct_mime = {
60 _STRUCT_JSON: 'application/json',
61 _STRUCT_MSGPACK: 'application/msgpack',
62 }
64 def __init__(self, root: gws.Root, environ: dict, site: gws.WebSite, **kwargs):
65 if 'wz' in kwargs:
66 self._wz = kwargs['wz']
67 else:
68 self._wz = werkzeug.wrappers.Request(environ)
70 # this is also set in nginx (see server/ini), but we need this for unzipping (see data() below)
71 self.maxContentLength = int(root.app.cfg('server.web.maxRequestLength') or 1) * 1024 * 1024
72 self._wz.max_content_length = self.maxContentLength
74 self.root = root
75 self.site = site
77 self.environ = self._wz.environ
78 self.method = cast(gws.RequestMethod, self._wz.method.upper())
79 self.isSecure = self._wz.is_secure
81 self.session = root.app.authMgr.guestSession
82 self.user = root.app.authMgr.guestUser
84 self.isGet = self.method == gws.RequestMethod.GET
85 self.isPost = self.method == gws.RequestMethod.POST
86 self.isForm = False
87 self.isApi = False
89 self.structInput = None
90 self.structOutput = None
92 self.contentTypeHeader = self.header('content-type', '').lower().split(';')[0].strip()
93 self.contentType = gws.lib.mime.get(self.contentTypeHeader) or gws.lib.mime.BIN
95 if self.isPost:
96 if self.contentTypeHeader == 'application/x-www-form-urlencoded' or self.contentTypeHeader == 'multipart/form-data':
97 self.isForm = True
98 else:
99 self.structInput = self._struct_type(self.contentTypeHeader)
100 if self.structInput:
101 self.isApi = True
102 self.structOutput = self._struct_type(self.header('accept')) or self.structInput
104 self._parsed_params = {}
105 self._parsed_params_lc = {}
106 self._parsed_query_params = {}
107 self._parsed_struct = {}
108 self._parsed_command = ''
109 self._parsed_path = ''
110 self._parsed = False
111 self._raw_post_data = None
112 self._uid = gws.u.mstime()
114 if self.root.app.developer_option('request.log_all'):
115 u = {
116 'method': self.method,
117 'path': self._wz.path,
118 'query': self._wz.query_string,
119 'headers': self._wz.headers,
120 'environ': self._wz.environ,
121 }
122 gws.u.write_debug_file(f'request_{self._uid}', ''.join(f'{k}={v!r}\n' for k, v in u.items()))
124 def __repr__(self):
125 return f'<Requester {self._wz}>'
127 def parse(self):
128 self._parse()
130 def params(self):
131 self._parse()
132 return self._parsed_params
134 def query_params(self):
135 self._parse()
136 return self._parsed_query_params
138 def path(self):
139 self._parse()
140 return self._parsed_path
142 def struct(self):
143 self._parse()
144 return self._parsed_struct
146 def command(self):
147 self._parse()
148 return self._parsed_command
150 def data(self):
151 if not self.isPost:
152 return b''
154 if self._raw_post_data is not None:
155 return self._raw_post_data
157 cl = self.header('content-length')
158 if not cl:
159 self._raw_post_data = b''
160 return self._raw_post_data
161 try:
162 cl = int(cl)
163 except ValueError as exc:
164 raise error.BadRequest('invalid content-length header') from exc
165 if cl == 0:
166 self._raw_post_data = b''
167 return self._raw_post_data
168 if cl > self.maxContentLength:
169 raise error.RequestEntityTooLarge(f'content-length header too large: {cl}')
171 data = self._wz.get_data(as_text=False, cache=False, parse_form_data=False)
173 if self.root.app.developer_option('request.log_all'):
174 gws.u.write_debug_file(f'request_{self._uid}.data', data)
176 if self.header('content-encoding') == 'gzip':
177 try:
178 with gzip.GzipFile(fileobj=io.BytesIO(data)) as fp:
179 data = fp.read(self.maxContentLength)
180 except OSError as exc:
181 raise error.BadRequest('gzip data error') from exc
183 self._raw_post_data = data
184 return data
186 def text(self):
187 data = self.data()
188 if not data:
189 return ''
191 charset = self._wz.mimetype_params.get('charset', 'utf-8')
192 try:
193 return data.decode(encoding=charset, errors='strict')
194 except UnicodeDecodeError as exc:
195 raise error.BadRequest('text data decoding error') from exc
197 def form(self):
198 if not self.isForm:
199 return []
200 data = self.data()
201 if not data:
202 return []
204 try:
205 stream = io.BytesIO(data)
206 opts = self._wz.mimetype_params
208 # Fix for Qt multipart/form-data boundaries
209 # Qt boundaries start with "boundary_.oOo._" and are base64-encoded
210 # https://github.com/qt/qtbase/blob/04b7fc2de3c97174a725bbd4fdc0f6e496c85861/src/network/access/qhttpmultipart.cpp#L400
211 # Werkzeug header parser does not understand base64 chars "/+="
212 if data.startswith(b'--boundary_.oOo._'):
213 boundary = data[2 : data.find(b'\r\n')].decode('ascii')
214 opts['boundary'] = boundary
216 parser = werkzeug.formparser.FormDataParser(silent=False)
217 _, form, files = parser.parse(
218 stream,
219 mimetype=self.contentTypeHeader,
220 content_length=len(data),
221 options=opts,
222 )
223 return list(form.items()) + list(files.items())
224 except Exception as exc:
225 raise error.BadRequest(f'form decode error: {exc}') from exc
227 def env(self, key, default=''):
228 return self._wz.environ.get(key, default)
230 def has_param(self, key):
231 self._parse()
232 return key.lower() in self._parsed_params_lc
234 def param(self, key, default=''):
235 self._parse()
236 return self._parsed_params_lc.get(key.lower(), default)
238 def header(self, key, default=''):
239 return self._wz.headers.get(key, default)
241 def cookie(self, key, default=''):
242 return self._wz.cookies.get(key, default)
244 def content_responder(self, response):
245 args: dict = {
246 'mimetype': response.mime,
247 'status': response.status or 200,
248 'headers': {},
249 'direct_passthrough': False,
250 }
252 if response.contentFilename:
253 if response.contentFilename.isascii():
254 args['headers']['Content-Disposition'] = f'attachment; filename="{response.contentFilename}"'
255 else:
256 args['headers']['Content-Disposition'] = 'attachment; filename="{}"; filename*=UTF-8\'\'{}'.format(
257 response.contentFilename.encode('ascii', errors='replace').decode('ascii'),
258 urllib.parse.quote(response.contentFilename, safe=''),
259 )
260 args['mimetype'] = args['mimetype'] or gws.lib.mime.for_path(response.contentFilename)
262 if response.contentPath:
263 args['response'] = werkzeug.wsgi.wrap_file(self.environ, open(response.contentPath, 'rb'))
264 args['headers']['Content-Length'] = str(os.path.getsize(response.contentPath))
265 args['mimetype'] = args['mimetype'] or gws.lib.mime.for_path(response.contentPath)
266 args['direct_passthrough'] = True
267 else:
268 args['response'] = response.content
270 if response.headers:
271 args['headers'].update(response.headers)
273 return Responder(**args)
275 def redirect_responder(self, response):
276 wz = werkzeug.utils.redirect(response.location, response.status or 302)
277 if response.headers:
278 wz.headers.update(response.headers)
279 return Responder(wz=wz)
281 def api_responder(self, response):
282 typ = self.structOutput or self._STRUCT_JSON
283 return Responder(
284 response=self._encode_struct(response, typ),
285 mimetype=self._struct_mime[typ],
286 status=response.status or 200,
287 )
289 def error_responder(self, exc):
290 err = exc if isinstance(exc, error.HTTPException) else error.InternalServerError()
291 return Responder(wz=err.get_response(self._wz.environ))
293 ##
295 def url_for(self, path, **kwargs):
296 return self.site.url_for(self, path, **kwargs)
298 ##
300 def set_session(self, sess):
301 self.session = sess
302 self.user = sess.user
304 ##
306 _CMD_PARAM_NAME = 'cmd'
308 def _parse(self):
309 if not self._parsed:
310 self._parse2()
311 self._parsed = True
313 def _parse2(self):
314 # the server only understands requests to /_ or /_/commandName
315 # GET params can be given as query string or encoded in the path
316 # like _/commandName/param1/value1/param2/value2 etc
318 path = self._wz.path
319 path_parts = None
321 if path == gws.c.SERVER_ENDPOINT:
322 # example.com/_
323 # the cmd param is expected to be in the query string or json
324 cmd = ''
325 elif path.startswith(gws.c.SERVER_ENDPOINT + '/'):
326 # example.com/_/someCommand
327 # the cmd param is in the url
328 path_parts = path.split('/')
329 cmd = path_parts[2]
330 path_parts = path_parts[3:]
331 self._parsed_path = '/'.join(path_parts)
332 else:
333 raise error.NotFound(f'invalid request path: {path!r}')
335 if self.structInput:
336 self._parsed_struct = self._decode_struct(self.structInput)
337 self._parsed_command = cmd or self._parsed_struct.pop(self._CMD_PARAM_NAME, '')
338 else:
339 d = dict(self._wz.args)
340 if path_parts:
341 for n in range(1, len(path_parts), 2):
342 d[path_parts[n - 1]] = path_parts[n]
343 self._parsed_command = cmd or d.pop(self._CMD_PARAM_NAME, '')
344 self._parsed_params = d
345 self._parsed_params_lc = {k.lower(): v for k, v in d.items()}
346 self._parsed_query_params = dict(self._wz.args)
348 def _struct_type(self, header):
349 if header:
350 header = header.lower()
351 if header.startswith(self._struct_mime[self._STRUCT_JSON]):
352 return self._STRUCT_JSON
353 if header.startswith(self._struct_mime[self._STRUCT_MSGPACK]):
354 return self._STRUCT_MSGPACK
356 def _encode_struct(self, data, typ):
357 if typ == self._STRUCT_JSON:
358 return gws.lib.jsonx.to_string(data)
359 if typ == self._STRUCT_MSGPACK:
360 return umsgpack.dumps(data, default=gws.u.to_dict)
361 raise ValueError(f'invalid struct type {typ!r}')
363 def _decode_struct(self, typ):
364 if typ == self._STRUCT_JSON:
365 try:
366 data = gws.u.require(self.data())
367 s = data.decode(encoding='utf-8', errors='strict')
368 return gws.lib.jsonx.from_string(s)
369 except Exception as exc:
370 raise error.BadRequest('malformed json request') from exc
372 if typ == self._STRUCT_MSGPACK:
373 try:
374 data = gws.u.require(self.data())
375 return umsgpack.loads(data)
376 except Exception as exc:
377 raise error.BadRequest('malformed msgpack request') from exc
379 raise ValueError(f'invalid struct type {typ!r}')