Coverage for gws-app/gws/base/web/wsgi.py: 70%
200 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
1"""Basic WSGI request/response handling."""
3import gzip
4import io
5import os
6import werkzeug.utils
7import werkzeug.wrappers
8import werkzeug.wsgi
10import gws
11import gws.lib.jsonx
12import gws.lib.mime
13import gws.lib.vendor.umsgpack as umsgpack
15from . import error
18class Responder(gws.WebResponder):
19 def __init__(self, **kwargs):
20 if 'wz' in kwargs:
21 self._wz = kwargs['wz']
22 else:
23 self._wz = werkzeug.wrappers.Response(**kwargs)
24 self.response = kwargs.get('response')
25 self.status = self._wz.status_code
27 def __repr__(self):
28 return f'<Responder {self._wz}>'
30 def send_response(self, environ, start_response):
31 return self._wz(environ, start_response)
33 def set_cookie(self, key, value, **kwargs):
34 self._wz.set_cookie(key, value, **kwargs)
36 def delete_cookie(self, key, **kwargs):
37 self._wz.delete_cookie(key, **kwargs)
39 def add_header(self, key, value):
40 self._wz.headers.add(key, value)
42 def set_status(self, status):
43 self._wz.status_code = int(status)
46class Requester(gws.WebRequester):
47 _struct_mime = {
48 'json': 'application/json',
49 'msgpack': 'application/msgpack',
50 }
52 def __init__(self, root: gws.Root, environ: dict, site: gws.WebSite, **kwargs):
53 if 'wz' in kwargs:
54 self._wz = kwargs['wz']
55 else:
56 self._wz = werkzeug.wrappers.Request(environ)
58 # this is also set in nginx (see server/ini), but we need this for unzipping (see data() below)
59 self._wz.max_content_length = int(root.app.cfg('server.web.maxRequestLength', default=1)) * 1024 * 1024
61 self.root = root
62 self.site = site
64 self.environ = self._wz.environ
65 self.method = self._wz.method.upper()
66 self.isSecure = self._wz.is_secure
68 self.session = root.app.authMgr.guestSession
69 self.user = root.app.authMgr.guestUser
71 self.isPost = self.method == 'POST'
72 self.isGet = self.method == 'GET'
74 self.contentType = gws.lib.mime.get(self.header('content-type')) or gws.lib.mime.BIN
76 self.inputType = None
77 if self.isPost:
78 self.inputType = self._struct_type(self.header('content-type'))
80 self.outputType = None
81 if self.inputType:
82 self.outputType = self._struct_type(self.header('accept')) or self.inputType
84 self.isApi = self.inputType is not None
86 self._parsed_params = {}
87 self._parsed_params_lc = {}
88 self._parsed_struct = {}
89 self._parsed_command = ''
90 self._parsed = False
91 self._uid = gws.u.mstime()
93 if self.root.app.developer_option('request.log_all'):
94 u = {
95 'method': self.method,
96 'path': self._wz.path,
97 'query': self._wz.query_string,
98 'cookies': self._wz.cookies,
99 'headers': self._wz.headers,
100 'environ': self._wz.environ,
101 }
102 gws.u.write_debug_file(f'request_{self._uid}', ''.join(f'{k}={v!r}\n' for k, v in u.items()))
104 def __repr__(self):
105 return f'<Requester {self._wz}>'
107 def params(self):
108 self._parse()
109 return self._parsed_params
111 def struct(self):
112 self._parse()
113 return self._parsed_struct
115 def command(self):
116 self._parse()
117 return self._parsed_command
119 def data(self):
120 if not self.isPost:
121 return None
123 data = self._wz.get_data(as_text=False, parse_form_data=False)
125 if self.root.app.developer_option('request.log_all'):
126 gws.u.write_debug_file(f'request_{self._uid}.data', data)
128 if self.header('content-encoding') == 'gzip':
129 with gzip.GzipFile(fileobj=io.BytesIO(data)) as fp:
130 return fp.read(self._wz.max_content_length)
132 return data
134 def text(self):
135 data = self.data()
136 if data is None:
137 return None
139 charset = self.header('charset', 'utf-8')
140 try:
141 return data.decode(encoding=charset, errors='strict')
142 except UnicodeDecodeError as exc:
143 gws.log.error('post data decoding error')
144 raise error.BadRequest() from exc
146 def env(self, key, default=''):
147 return self._wz.environ.get(key, default)
149 def has_param(self, key):
150 self._parse()
151 return key.lower() in self._parsed_params_lc
153 def param(self, key, default=''):
154 return self._parsed_params_lc.get(key.lower(), default)
156 def header(self, key, default=''):
157 return self._wz.headers.get(key, default)
159 def cookie(self, key, default=''):
160 return self._wz.cookies.get(key, default)
162 def content_responder(self, response):
163 args: dict = {
164 'mimetype': response.mime,
165 'status': response.status or 200,
166 'headers': {},
167 'direct_passthrough': False,
168 }
170 attach_name = None
172 if response.attachmentName:
173 attach_name = response.attachmentName
174 elif response.asAttachment:
175 if response.contentPath:
176 attach_name = os.path.basename(response.contentPath)
177 elif response.mime:
178 ext = gws.lib.mime.extension_for(response.mime)
179 attach_name = 'download.' + (ext or 'bin')
180 else:
181 attach_name = 'download'
183 if attach_name:
184 args['headers']['Content-Disposition'] = f'attachment; filename="{attach_name}"'
185 args['mimetype'] = args['mimetype'] or gws.lib.mime.for_path(attach_name)
187 if response.contentPath:
188 args['response'] = werkzeug.wsgi.wrap_file(self.environ, open(response.contentPath, 'rb'))
189 args['headers']['Content-Length'] = str(os.path.getsize(response.contentPath))
190 args['mimetype'] = args['mimetype'] or gws.lib.mime.for_path(response.contentPath)
191 args['direct_passthrough'] = True
192 else:
193 args['response'] = response.content
195 if response.headers:
196 args['headers'].update(response.headers)
198 return Responder(**args)
200 def redirect_responder(self, response):
201 wz = werkzeug.utils.redirect(response.location, response.status or 302)
202 if response.headers:
203 wz.headers.update(response.headers)
204 return Responder(wz=wz)
206 def api_responder(self, response):
207 typ = self.outputType or 'json'
208 return Responder(
209 response=self._encode_struct(response, typ),
210 mimetype=self._struct_mime[typ],
211 status=response.status or 200,
212 )
214 def error_responder(self, exc):
215 err = exc if isinstance(exc, error.HTTPException) else error.InternalServerError()
216 return Responder(wz=err.get_response(self._wz.environ))
218 ##
220 def url_for(self, path, **kwargs):
221 return self.site.url_for(self, path, **kwargs)
223 ##
225 def set_session(self, sess):
226 self.session = sess
227 self.user = sess.user
229 ##
231 _cmd_param_name = 'cmd'
233 def _parse(self):
234 if self._parsed:
235 return
237 # the server only understands requests to /_ or /_/commandName
238 # GET params can be given as query string or encoded in the path
239 # like _/commandName/param1/value1/param2/value2 etc
241 path = self._wz.path
242 path_parts = None
244 if path == gws.c.SERVER_ENDPOINT:
245 # example.com/_
246 # the cmd param is expected to be in the query string or json
247 cmd = ''
248 elif path.startswith(gws.c.SERVER_ENDPOINT + '/'):
249 # example.com/_/someCommand
250 # the cmd param is in the url
251 path_parts = path.split('/')
252 cmd = path_parts[2]
253 path_parts = path_parts[3:]
254 else:
255 raise error.NotFound(f'invalid request path: {path!r}')
257 if self.inputType:
258 self._parsed_struct = self._decode_struct(self.inputType)
259 self._parsed_command = cmd or self._parsed_struct.pop(self._cmd_param_name, '')
260 else:
261 d = dict(self._wz.args)
262 if path_parts:
263 for n in range(1, len(path_parts), 2):
264 d[path_parts[n - 1]] = path_parts[n]
265 self._parsed_command = cmd or d.pop(self._cmd_param_name, '')
266 self._parsed_params = d
267 self._parsed_params_lc = {k.lower(): v for k, v in d.items()}
269 self._parsed = True
271 def _struct_type(self, header):
272 if header:
273 header = header.lower()
274 if header.startswith(self._struct_mime['json']):
275 return 'json'
276 if header.startswith(self._struct_mime['msgpack']):
277 return 'msgpack'
279 def _encode_struct(self, data, typ):
280 if typ == 'json':
281 return gws.lib.jsonx.to_string(data)
282 if typ == 'msgpack':
283 return umsgpack.dumps(data, default=gws.u.to_dict)
284 raise ValueError('invalid struct type')
286 def _decode_struct(self, typ):
287 if typ == 'json':
288 try:
289 data = gws.u.require(self.data())
290 s = data.decode(encoding='utf-8', errors='strict')
291 return gws.lib.jsonx.from_string(s)
292 except (UnicodeDecodeError, gws.lib.jsonx.Error):
293 gws.log.error('malformed json request')
294 raise error.BadRequest()
296 if typ == 'msgpack':
297 try:
298 data = gws.u.require(self.data())
299 return umsgpack.loads(data)
300 except (TypeError, umsgpack.UnpackException):
301 gws.log.error('malformed msgpack request')
302 raise error.BadRequest()
304 gws.log.error('invalid struct type')
305 raise error.BadRequest()