Coverage for gws-app/gws/base/web/wsgi.py: 70%

200 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1"""Basic WSGI request/response handling.""" 

2 

3import gzip 

4import io 

5import os 

6import werkzeug.utils 

7import werkzeug.wrappers 

8import werkzeug.wsgi 

9 

10import gws 

11import gws.lib.jsonx 

12import gws.lib.mime 

13import gws.lib.vendor.umsgpack as umsgpack 

14 

15from . import error 

16 

17 

18class Responder(gws.WebResponder): 

19 def __init__(self, **kwargs): 

20 if 'wz' in kwargs: 

21 self._wz = kwargs['wz'] 

22 else: 

23 self._wz = werkzeug.wrappers.Response(**kwargs) 

24 self.response = kwargs.get('response') 

25 self.status = self._wz.status_code 

26 

27 def __repr__(self): 

28 return f'<Responder {self._wz}>' 

29 

30 def send_response(self, environ, start_response): 

31 return self._wz(environ, start_response) 

32 

33 def set_cookie(self, key, value, **kwargs): 

34 self._wz.set_cookie(key, value, **kwargs) 

35 

36 def delete_cookie(self, key, **kwargs): 

37 self._wz.delete_cookie(key, **kwargs) 

38 

39 def add_header(self, key, value): 

40 self._wz.headers.add(key, value) 

41 

42 def set_status(self, status): 

43 self._wz.status_code = int(status) 

44 

45 

46class Requester(gws.WebRequester): 

47 _struct_mime = { 

48 'json': 'application/json', 

49 'msgpack': 'application/msgpack', 

50 } 

51 

52 def __init__(self, root: gws.Root, environ: dict, site: gws.WebSite, **kwargs): 

53 if 'wz' in kwargs: 

54 self._wz = kwargs['wz'] 

55 else: 

56 self._wz = werkzeug.wrappers.Request(environ) 

57 

58 # this is also set in nginx (see server/ini), but we need this for unzipping (see data() below) 

59 self._wz.max_content_length = int(root.app.cfg('server.web.maxRequestLength', default=1)) * 1024 * 1024 

60 

61 self.root = root 

62 self.site = site 

63 

64 self.environ = self._wz.environ 

65 self.method = self._wz.method.upper() 

66 self.isSecure = self._wz.is_secure 

67 

68 self.session = root.app.authMgr.guestSession 

69 self.user = root.app.authMgr.guestUser 

70 

71 self.isPost = self.method == 'POST' 

72 self.isGet = self.method == 'GET' 

73 

74 self.contentType = gws.lib.mime.get(self.header('content-type')) or gws.lib.mime.BIN 

75 

76 self.inputType = None 

77 if self.isPost: 

78 self.inputType = self._struct_type(self.header('content-type')) 

79 

80 self.outputType = None 

81 if self.inputType: 

82 self.outputType = self._struct_type(self.header('accept')) or self.inputType 

83 

84 self.isApi = self.inputType is not None 

85 

86 self._parsed_params = {} 

87 self._parsed_params_lc = {} 

88 self._parsed_struct = {} 

89 self._parsed_command = '' 

90 self._parsed = False 

91 self._uid = gws.u.mstime() 

92 

93 if self.root.app.developer_option('request.log_all'): 

94 u = { 

95 'method': self.method, 

96 'path': self._wz.path, 

97 'query': self._wz.query_string, 

98 'cookies': self._wz.cookies, 

99 'headers': self._wz.headers, 

100 'environ': self._wz.environ, 

101 } 

102 gws.u.write_debug_file(f'request_{self._uid}', ''.join(f'{k}={v!r}\n' for k, v in u.items())) 

103 

104 def __repr__(self): 

105 return f'<Requester {self._wz}>' 

106 

107 def params(self): 

108 self._parse() 

109 return self._parsed_params 

110 

111 def struct(self): 

112 self._parse() 

113 return self._parsed_struct 

114 

115 def command(self): 

116 self._parse() 

117 return self._parsed_command 

118 

119 def data(self): 

120 if not self.isPost: 

121 return None 

122 

123 data = self._wz.get_data(as_text=False, parse_form_data=False) 

124 

125 if self.root.app.developer_option('request.log_all'): 

126 gws.u.write_debug_file(f'request_{self._uid}.data', data) 

127 

128 if self.header('content-encoding') == 'gzip': 

129 with gzip.GzipFile(fileobj=io.BytesIO(data)) as fp: 

130 return fp.read(self._wz.max_content_length) 

131 

132 return data 

133 

134 def text(self): 

135 data = self.data() 

136 if data is None: 

137 return None 

138 

139 charset = self.header('charset', 'utf-8') 

140 try: 

141 return data.decode(encoding=charset, errors='strict') 

142 except UnicodeDecodeError as exc: 

143 gws.log.error('post data decoding error') 

144 raise error.BadRequest() from exc 

145 

146 def env(self, key, default=''): 

147 return self._wz.environ.get(key, default) 

148 

149 def has_param(self, key): 

150 self._parse() 

151 return key.lower() in self._parsed_params_lc 

152 

153 def param(self, key, default=''): 

154 return self._parsed_params_lc.get(key.lower(), default) 

155 

156 def header(self, key, default=''): 

157 return self._wz.headers.get(key, default) 

158 

159 def cookie(self, key, default=''): 

160 return self._wz.cookies.get(key, default) 

161 

162 def content_responder(self, response): 

163 args: dict = { 

164 'mimetype': response.mime, 

165 'status': response.status or 200, 

166 'headers': {}, 

167 'direct_passthrough': False, 

168 } 

169 

170 attach_name = None 

171 

172 if response.attachmentName: 

173 attach_name = response.attachmentName 

174 elif response.asAttachment: 

175 if response.contentPath: 

176 attach_name = os.path.basename(response.contentPath) 

177 elif response.mime: 

178 ext = gws.lib.mime.extension_for(response.mime) 

179 attach_name = 'download.' + (ext or 'bin') 

180 else: 

181 attach_name = 'download' 

182 

183 if attach_name: 

184 args['headers']['Content-Disposition'] = f'attachment; filename="{attach_name}"' 

185 args['mimetype'] = args['mimetype'] or gws.lib.mime.for_path(attach_name) 

186 

187 if response.contentPath: 

188 args['response'] = werkzeug.wsgi.wrap_file(self.environ, open(response.contentPath, 'rb')) 

189 args['headers']['Content-Length'] = str(os.path.getsize(response.contentPath)) 

190 args['mimetype'] = args['mimetype'] or gws.lib.mime.for_path(response.contentPath) 

191 args['direct_passthrough'] = True 

192 else: 

193 args['response'] = response.content 

194 

195 if response.headers: 

196 args['headers'].update(response.headers) 

197 

198 return Responder(**args) 

199 

200 def redirect_responder(self, response): 

201 wz = werkzeug.utils.redirect(response.location, response.status or 302) 

202 if response.headers: 

203 wz.headers.update(response.headers) 

204 return Responder(wz=wz) 

205 

206 def api_responder(self, response): 

207 typ = self.outputType or 'json' 

208 return Responder( 

209 response=self._encode_struct(response, typ), 

210 mimetype=self._struct_mime[typ], 

211 status=response.status or 200, 

212 ) 

213 

214 def error_responder(self, exc): 

215 err = exc if isinstance(exc, error.HTTPException) else error.InternalServerError() 

216 return Responder(wz=err.get_response(self._wz.environ)) 

217 

218 ## 

219 

220 def url_for(self, path, **kwargs): 

221 return self.site.url_for(self, path, **kwargs) 

222 

223 ## 

224 

225 def set_session(self, sess): 

226 self.session = sess 

227 self.user = sess.user 

228 

229 ## 

230 

231 _cmd_param_name = 'cmd' 

232 

233 def _parse(self): 

234 if self._parsed: 

235 return 

236 

237 # the server only understands requests to /_ or /_/commandName 

238 # GET params can be given as query string or encoded in the path 

239 # like _/commandName/param1/value1/param2/value2 etc 

240 

241 path = self._wz.path 

242 path_parts = None 

243 

244 if path == gws.c.SERVER_ENDPOINT: 

245 # example.com/_ 

246 # the cmd param is expected to be in the query string or json 

247 cmd = '' 

248 elif path.startswith(gws.c.SERVER_ENDPOINT + '/'): 

249 # example.com/_/someCommand 

250 # the cmd param is in the url 

251 path_parts = path.split('/') 

252 cmd = path_parts[2] 

253 path_parts = path_parts[3:] 

254 else: 

255 raise error.NotFound(f'invalid request path: {path!r}') 

256 

257 if self.inputType: 

258 self._parsed_struct = self._decode_struct(self.inputType) 

259 self._parsed_command = cmd or self._parsed_struct.pop(self._cmd_param_name, '') 

260 else: 

261 d = dict(self._wz.args) 

262 if path_parts: 

263 for n in range(1, len(path_parts), 2): 

264 d[path_parts[n - 1]] = path_parts[n] 

265 self._parsed_command = cmd or d.pop(self._cmd_param_name, '') 

266 self._parsed_params = d 

267 self._parsed_params_lc = {k.lower(): v for k, v in d.items()} 

268 

269 self._parsed = True 

270 

271 def _struct_type(self, header): 

272 if header: 

273 header = header.lower() 

274 if header.startswith(self._struct_mime['json']): 

275 return 'json' 

276 if header.startswith(self._struct_mime['msgpack']): 

277 return 'msgpack' 

278 

279 def _encode_struct(self, data, typ): 

280 if typ == 'json': 

281 return gws.lib.jsonx.to_string(data) 

282 if typ == 'msgpack': 

283 return umsgpack.dumps(data, default=gws.u.to_dict) 

284 raise ValueError('invalid struct type') 

285 

286 def _decode_struct(self, typ): 

287 if typ == 'json': 

288 try: 

289 data = gws.u.require(self.data()) 

290 s = data.decode(encoding='utf-8', errors='strict') 

291 return gws.lib.jsonx.from_string(s) 

292 except (UnicodeDecodeError, gws.lib.jsonx.Error): 

293 gws.log.error('malformed json request') 

294 raise error.BadRequest() 

295 

296 if typ == 'msgpack': 

297 try: 

298 data = gws.u.require(self.data()) 

299 return umsgpack.loads(data) 

300 except (TypeError, umsgpack.UnpackException): 

301 gws.log.error('malformed msgpack request') 

302 raise error.BadRequest() 

303 

304 gws.log.error('invalid struct type') 

305 raise error.BadRequest()