Coverage for gws-app / gws / plugin / auth_method / web / core.py: 76%

185 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1"""Web authorisation method.""" 

2 

3import re 

4from typing import Optional, cast 

5 

6import gws 

7import gws.base.auth 

8import gws.base.web 

9 

10gws.ext.new.authMethod('web') 

11 

12 

13class LoginRedirectRule(gws.Data): 

14 """Login redirect rule.""" 

15 

16 pattern: Optional[gws.Regex] 

17 """URL matching pattern for pages that must be redirected.""" 

18 target: str 

19 """Target url.""" 

20 

21 

22class Config(gws.base.auth.method.Config): 

23 """Web-based authorization options""" 

24 

25 cookieName: str = 'auth' 

26 """Name for the cookie.""" 

27 cookiePath: str = '/' 

28 """Cookie path.""" 

29 cookieSameSite: str = 'Lax' 

30 """Cookie SameSite attribute.""" 

31 loginRedirect: Optional[LoginRedirectRule] 

32 """Rule to redirect if a login is required.""" 

33 

34 

35## 

36 

37 

38class UserResponse(gws.Response): 

39 user: Optional[gws.base.auth.user.Props] 

40 

41 

42class LogoutResponse(gws.Response): 

43 pass 

44 

45 

46class LoginRequest(gws.Request): 

47 username: str 

48 password: str 

49 

50 

51class LoginResponse(gws.Response): 

52 user: Optional[gws.base.auth.user.Props] 

53 mfaState: Optional[gws.AuthMultiFactorState] 

54 mfaMessage: str = '' 

55 mfaCanRestart: bool = False 

56 

57 

58class MfaVerifyRequest(gws.Request): 

59 payload: dict 

60 

61 

62## 

63 

64_DELETED_SESSION = 'web:deleted' 

65 

66 

67class Object(gws.base.auth.method.Object): 

68 cookieName: str 

69 cookiePath: str 

70 cookieSameSite: str 

71 loginRedirect: Optional[LoginRedirectRule] 

72 

73 deletedSession: gws.base.auth.session.Object 

74 

75 def configure(self): 

76 self.uid = 'gws.plugin.auth.method.web' 

77 self.cookieName = self.cfg('cookieName', default=Config.cookieName) 

78 self.cookiePath = self.cfg('cookiePath', default=Config.cookiePath) 

79 self.cookieSameSite = self.cfg('cookieSameSite', default=Config.cookieSameSite) 

80 self.loginRedirect = self.cfg('loginRedirect') 

81 self.root.app.middlewareMgr.register(self, self.uid, depends_on=['auth']) 

82 

83 ## 

84 

85 def exit_middleware(self, req, res): 

86 if res.status in (403, 401) and req.isGet: 

87 self._check_login_redirect(req, res) 

88 

89 def _check_login_redirect(self, req: gws.WebRequester, res: gws.WebResponder): 

90 lr = self.loginRedirect 

91 if not lr: 

92 return 

93 request_uri = req.env('REQUEST_URI', '') 

94 if not request_uri: 

95 return 

96 if lr.pattern and not re.match(lr.pattern, request_uri): 

97 return 

98 redir = req.url_for(lr.target, to=request_uri) 

99 res.set_status(302) 

100 res.add_header('Location', redir) 

101 res.set_body(f'Redirecting to {redir}...') 

102 gws.log.debug(f'auth web: redirect {res.status=} {redir=}') 

103 

104 def activate(self): 

105 am = self.root.app.authMgr 

106 self.deletedSession = gws.base.auth.session.Object( 

107 uid=_DELETED_SESSION, 

108 method=self, 

109 user=am.guestUser, 

110 ) 

111 

112 def open_session(self, req): 

113 am = self.root.app.authMgr 

114 

115 sid = req.cookie(self.cookieName) 

116 if not sid: 

117 return 

118 

119 sess = am.sessionMgr.get_valid(sid) 

120 

121 if not sess: 

122 gws.log.debug(f'open_session: {sid=} not found or invalid') 

123 return self.deletedSession 

124 

125 return sess 

126 

127 def close_session(self, req, res): 

128 am = self.root.app.authMgr 

129 

130 sess = getattr(req, 'session') 

131 if not sess: 

132 return 

133 

134 if sess.uid == _DELETED_SESSION: 

135 gws.log.debug('session cookie=deleted') 

136 res.delete_cookie( 

137 self.cookieName, 

138 path=self.cookiePath, 

139 ) 

140 return 

141 

142 if res.status < 400: 

143 gws.log.debug(f'session cookie={sess.uid!r}') 

144 res.set_cookie( 

145 self.cookieName, 

146 sess.uid, 

147 path=self.cookiePath, 

148 secure=self.secure, 

149 samesite=self.cookieSameSite, 

150 httponly=True, 

151 ) 

152 am.sessionMgr.save(sess) 

153 

154 def handle_login(self, req: gws.WebRequester, p: LoginRequest) -> LoginResponse: 

155 if not req.user.isGuest: 

156 raise gws.ForbiddenError(f'login: already logged-in {req.user.uid=}') 

157 

158 if self.secure and not req.isSecure: 

159 raise gws.ForbiddenError('login: insecure_context, ignored') 

160 

161 user = self.root.app.authMgr.authenticate(self, p) 

162 if not user: 

163 raise gws.ForbiddenError('login: user not found') 

164 

165 if user.mfaUid: 

166 mfa = self._mfa_start(req, user) 

167 gws.log.info(f'LOGGED_IN (MFA pending): {user.uid=} {user.roles=}') 

168 return self._mfa_response(mfa) 

169 

170 self._finalize_login(req, user) 

171 return LoginResponse(user=gws.props_of(user, user)) 

172 

173 def handle_mfa_verify(self, req: gws.WebRequester, p: MfaVerifyRequest) -> LoginResponse: 

174 try: 

175 mfa = self._mfa_verify(req, p.payload) 

176 except gws.ForbiddenError: 

177 self._delete_session(req) 

178 raise 

179 

180 if mfa.state == gws.AuthMultiFactorState.ok: 

181 self._finalize_login(req, mfa.user) 

182 return self._mfa_response(mfa) 

183 

184 if mfa.state == gws.AuthMultiFactorState.retry: 

185 return self._mfa_response(mfa) 

186 

187 self._delete_session(req) 

188 raise gws.ForbiddenError(f'MFA: verify failed {mfa.state=}') 

189 

190 def handle_mfa_restart(self, req: gws.WebRequester, p: gws.Request) -> LoginResponse: 

191 try: 

192 mfa = self._mfa_restart(req) 

193 except gws.ForbiddenError: 

194 self._delete_session(req) 

195 raise 

196 

197 return self._mfa_response(mfa) 

198 

199 def handle_logout(self, req: gws.WebRequester) -> LogoutResponse: 

200 if req.user.isGuest: 

201 self._delete_session(req) 

202 return LogoutResponse() 

203 

204 if req.session.method != self: 

205 raise gws.ForbiddenError(f'wrong method for logout: {req.session.method!r}') 

206 

207 self._delete_session(req) 

208 

209 gws.log.info(f'LOGGED_OUT: user={req.user.uid!r}') 

210 return LogoutResponse() 

211 

212 ## 

213 

214 def _delete_session(self, req: gws.WebRequester): 

215 am = self.root.app.authMgr 

216 am.sessionMgr.delete(req.session) 

217 req.set_session(self.deletedSession) 

218 

219 def _finalize_login(self, req: gws.WebRequester, user: gws.User): 

220 self._delete_session(req) 

221 am = self.root.app.authMgr 

222 req.set_session(am.sessionMgr.create(self, user)) 

223 gws.log.info(f'LOGGED_IN: {user.uid=} {user.roles=}') 

224 

225 ## 

226 

227 def _mfa_start(self, req: gws.WebRequester, user: gws.User) -> gws.AuthMultiFactorTransaction: 

228 am = self.root.app.authMgr 

229 

230 adapter = am.get_multi_factor_adapter(user.mfaUid) 

231 if not adapter: 

232 raise gws.ForbiddenError(f'MFA: {user.mfaUid=} unknown') 

233 

234 mfa = adapter.start(user) 

235 if not mfa: 

236 raise gws.ForbiddenError(f'MFA: {user.mfaUid=} start failed') 

237 

238 req.set_session(am.sessionMgr.create(self, am.guestUser)) 

239 

240 self._mfa_store(req, mfa) 

241 return mfa 

242 

243 def _mfa_verify(self, req: gws.WebRequester, payload: dict) -> gws.AuthMultiFactorTransaction: 

244 mfa = self._mfa_load(req) 

245 mfa = mfa.adapter.verify(mfa, payload) 

246 

247 self._mfa_store(req, mfa) 

248 return mfa 

249 

250 def _mfa_restart(self, req: gws.WebRequester) -> gws.AuthMultiFactorTransaction: 

251 mfa = self._mfa_load(req) 

252 mfa = mfa.adapter.restart(mfa) 

253 if not mfa: 

254 raise gws.ForbiddenError(f'MFA: restart failed') 

255 

256 self._mfa_store(req, mfa) 

257 return mfa 

258 

259 def _mfa_store(self, req: gws.WebRequester, mfa: gws.AuthMultiFactorTransaction): 

260 am = self.root.app.authMgr 

261 

262 sess_mfa = gws.u.merge({}, mfa) 

263 sess_mfa['user'] = am.serialize_user(mfa.user) 

264 sess_mfa['adapter'] = mfa.adapter.uid 

265 req.session.set('AuthMultiFactorTransaction', sess_mfa) 

266 

267 def _mfa_load(self, req: gws.WebRequester) -> gws.AuthMultiFactorTransaction: 

268 am = self.root.app.authMgr 

269 

270 sess_mfa = req.session.get('AuthMultiFactorTransaction') 

271 if not sess_mfa: 

272 raise gws.ForbiddenError(f'MFA: transaction not found') 

273 

274 mfa = gws.AuthMultiFactorTransaction(sess_mfa) 

275 mfa.adapter = gws.u.require(am.get_multi_factor_adapter(sess_mfa['adapter'])) 

276 mfa.user = gws.u.require(am.unserialize_user(sess_mfa['user'])) 

277 

278 if not mfa.adapter.check_state(mfa): 

279 raise gws.ForbiddenError(f'MFA: invalid transaction in session') 

280 

281 return mfa 

282 

283 def _mfa_response(self, mfa: gws.AuthMultiFactorTransaction) -> LoginResponse: 

284 return LoginResponse( 

285 mfaState=mfa.state, 

286 mfaMessage=mfa.message, 

287 mfaCanRestart=mfa.adapter.check_restart(mfa), 

288 )