Coverage for gws-app / gws / plugin / auth_method / web / core.py: 76%
185 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1"""Web authorisation method."""
3import re
4from typing import Optional, cast
6import gws
7import gws.base.auth
8import gws.base.web
10gws.ext.new.authMethod('web')
13class LoginRedirectRule(gws.Data):
14 """Login redirect rule."""
16 pattern: Optional[gws.Regex]
17 """URL matching pattern for pages that must be redirected."""
18 target: str
19 """Target url."""
22class Config(gws.base.auth.method.Config):
23 """Web-based authorization options"""
25 cookieName: str = 'auth'
26 """Name for the cookie."""
27 cookiePath: str = '/'
28 """Cookie path."""
29 cookieSameSite: str = 'Lax'
30 """Cookie SameSite attribute."""
31 loginRedirect: Optional[LoginRedirectRule]
32 """Rule to redirect if a login is required."""
35##
38class UserResponse(gws.Response):
39 user: Optional[gws.base.auth.user.Props]
42class LogoutResponse(gws.Response):
43 pass
46class LoginRequest(gws.Request):
47 username: str
48 password: str
51class LoginResponse(gws.Response):
52 user: Optional[gws.base.auth.user.Props]
53 mfaState: Optional[gws.AuthMultiFactorState]
54 mfaMessage: str = ''
55 mfaCanRestart: bool = False
58class MfaVerifyRequest(gws.Request):
59 payload: dict
62##
64_DELETED_SESSION = 'web:deleted'
67class Object(gws.base.auth.method.Object):
68 cookieName: str
69 cookiePath: str
70 cookieSameSite: str
71 loginRedirect: Optional[LoginRedirectRule]
73 deletedSession: gws.base.auth.session.Object
75 def configure(self):
76 self.uid = 'gws.plugin.auth.method.web'
77 self.cookieName = self.cfg('cookieName', default=Config.cookieName)
78 self.cookiePath = self.cfg('cookiePath', default=Config.cookiePath)
79 self.cookieSameSite = self.cfg('cookieSameSite', default=Config.cookieSameSite)
80 self.loginRedirect = self.cfg('loginRedirect')
81 self.root.app.middlewareMgr.register(self, self.uid, depends_on=['auth'])
83 ##
85 def exit_middleware(self, req, res):
86 if res.status in (403, 401) and req.isGet:
87 self._check_login_redirect(req, res)
89 def _check_login_redirect(self, req: gws.WebRequester, res: gws.WebResponder):
90 lr = self.loginRedirect
91 if not lr:
92 return
93 request_uri = req.env('REQUEST_URI', '')
94 if not request_uri:
95 return
96 if lr.pattern and not re.match(lr.pattern, request_uri):
97 return
98 redir = req.url_for(lr.target, to=request_uri)
99 res.set_status(302)
100 res.add_header('Location', redir)
101 res.set_body(f'Redirecting to {redir}...')
102 gws.log.debug(f'auth web: redirect {res.status=} {redir=}')
104 def activate(self):
105 am = self.root.app.authMgr
106 self.deletedSession = gws.base.auth.session.Object(
107 uid=_DELETED_SESSION,
108 method=self,
109 user=am.guestUser,
110 )
112 def open_session(self, req):
113 am = self.root.app.authMgr
115 sid = req.cookie(self.cookieName)
116 if not sid:
117 return
119 sess = am.sessionMgr.get_valid(sid)
121 if not sess:
122 gws.log.debug(f'open_session: {sid=} not found or invalid')
123 return self.deletedSession
125 return sess
127 def close_session(self, req, res):
128 am = self.root.app.authMgr
130 sess = getattr(req, 'session')
131 if not sess:
132 return
134 if sess.uid == _DELETED_SESSION:
135 gws.log.debug('session cookie=deleted')
136 res.delete_cookie(
137 self.cookieName,
138 path=self.cookiePath,
139 )
140 return
142 if res.status < 400:
143 gws.log.debug(f'session cookie={sess.uid!r}')
144 res.set_cookie(
145 self.cookieName,
146 sess.uid,
147 path=self.cookiePath,
148 secure=self.secure,
149 samesite=self.cookieSameSite,
150 httponly=True,
151 )
152 am.sessionMgr.save(sess)
154 def handle_login(self, req: gws.WebRequester, p: LoginRequest) -> LoginResponse:
155 if not req.user.isGuest:
156 raise gws.ForbiddenError(f'login: already logged-in {req.user.uid=}')
158 if self.secure and not req.isSecure:
159 raise gws.ForbiddenError('login: insecure_context, ignored')
161 user = self.root.app.authMgr.authenticate(self, p)
162 if not user:
163 raise gws.ForbiddenError('login: user not found')
165 if user.mfaUid:
166 mfa = self._mfa_start(req, user)
167 gws.log.info(f'LOGGED_IN (MFA pending): {user.uid=} {user.roles=}')
168 return self._mfa_response(mfa)
170 self._finalize_login(req, user)
171 return LoginResponse(user=gws.props_of(user, user))
173 def handle_mfa_verify(self, req: gws.WebRequester, p: MfaVerifyRequest) -> LoginResponse:
174 try:
175 mfa = self._mfa_verify(req, p.payload)
176 except gws.ForbiddenError:
177 self._delete_session(req)
178 raise
180 if mfa.state == gws.AuthMultiFactorState.ok:
181 self._finalize_login(req, mfa.user)
182 return self._mfa_response(mfa)
184 if mfa.state == gws.AuthMultiFactorState.retry:
185 return self._mfa_response(mfa)
187 self._delete_session(req)
188 raise gws.ForbiddenError(f'MFA: verify failed {mfa.state=}')
190 def handle_mfa_restart(self, req: gws.WebRequester, p: gws.Request) -> LoginResponse:
191 try:
192 mfa = self._mfa_restart(req)
193 except gws.ForbiddenError:
194 self._delete_session(req)
195 raise
197 return self._mfa_response(mfa)
199 def handle_logout(self, req: gws.WebRequester) -> LogoutResponse:
200 if req.user.isGuest:
201 self._delete_session(req)
202 return LogoutResponse()
204 if req.session.method != self:
205 raise gws.ForbiddenError(f'wrong method for logout: {req.session.method!r}')
207 self._delete_session(req)
209 gws.log.info(f'LOGGED_OUT: user={req.user.uid!r}')
210 return LogoutResponse()
212 ##
214 def _delete_session(self, req: gws.WebRequester):
215 am = self.root.app.authMgr
216 am.sessionMgr.delete(req.session)
217 req.set_session(self.deletedSession)
219 def _finalize_login(self, req: gws.WebRequester, user: gws.User):
220 self._delete_session(req)
221 am = self.root.app.authMgr
222 req.set_session(am.sessionMgr.create(self, user))
223 gws.log.info(f'LOGGED_IN: {user.uid=} {user.roles=}')
225 ##
227 def _mfa_start(self, req: gws.WebRequester, user: gws.User) -> gws.AuthMultiFactorTransaction:
228 am = self.root.app.authMgr
230 adapter = am.get_multi_factor_adapter(user.mfaUid)
231 if not adapter:
232 raise gws.ForbiddenError(f'MFA: {user.mfaUid=} unknown')
234 mfa = adapter.start(user)
235 if not mfa:
236 raise gws.ForbiddenError(f'MFA: {user.mfaUid=} start failed')
238 req.set_session(am.sessionMgr.create(self, am.guestUser))
240 self._mfa_store(req, mfa)
241 return mfa
243 def _mfa_verify(self, req: gws.WebRequester, payload: dict) -> gws.AuthMultiFactorTransaction:
244 mfa = self._mfa_load(req)
245 mfa = mfa.adapter.verify(mfa, payload)
247 self._mfa_store(req, mfa)
248 return mfa
250 def _mfa_restart(self, req: gws.WebRequester) -> gws.AuthMultiFactorTransaction:
251 mfa = self._mfa_load(req)
252 mfa = mfa.adapter.restart(mfa)
253 if not mfa:
254 raise gws.ForbiddenError(f'MFA: restart failed')
256 self._mfa_store(req, mfa)
257 return mfa
259 def _mfa_store(self, req: gws.WebRequester, mfa: gws.AuthMultiFactorTransaction):
260 am = self.root.app.authMgr
262 sess_mfa = gws.u.merge({}, mfa)
263 sess_mfa['user'] = am.serialize_user(mfa.user)
264 sess_mfa['adapter'] = mfa.adapter.uid
265 req.session.set('AuthMultiFactorTransaction', sess_mfa)
267 def _mfa_load(self, req: gws.WebRequester) -> gws.AuthMultiFactorTransaction:
268 am = self.root.app.authMgr
270 sess_mfa = req.session.get('AuthMultiFactorTransaction')
271 if not sess_mfa:
272 raise gws.ForbiddenError(f'MFA: transaction not found')
274 mfa = gws.AuthMultiFactorTransaction(sess_mfa)
275 mfa.adapter = gws.u.require(am.get_multi_factor_adapter(sess_mfa['adapter']))
276 mfa.user = gws.u.require(am.unserialize_user(sess_mfa['user']))
278 if not mfa.adapter.check_state(mfa):
279 raise gws.ForbiddenError(f'MFA: invalid transaction in session')
281 return mfa
283 def _mfa_response(self, mfa: gws.AuthMultiFactorTransaction) -> LoginResponse:
284 return LoginResponse(
285 mfaState=mfa.state,
286 mfaMessage=mfa.message,
287 mfaCanRestart=mfa.adapter.check_restart(mfa),
288 )