Coverage for gws-app / gws / base / auth / manager.py: 85%
99 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1"""Authorization and session manager."""
3from typing import Optional, cast
5import gws
6import gws.config
7import gws.lib.jsonx
9from . import session, system_provider
12class Config(gws.Config):
13 """Authentication and authorization options"""
15 methods: Optional[list[gws.ext.config.authMethod]]
16 """Authorization methods."""
17 providers: Optional[list[gws.ext.config.authProvider]]
18 """Authorization providers."""
19 mfa: Optional[list[gws.ext.config.authMultiFactorAdapter]]
20 """Authorization providers."""
21 session: Optional[gws.ext.config.authSessionManager]
22 """Session options."""
25_DEFAULT_SESSION_TYPE = 'sqlite'
28class Object(gws.AuthManager):
29 """Authorization manager."""
31 def configure(self):
32 self.sessionMgr = self.create_child(gws.ext.object.authSessionManager, self.cfg('session'), type=_DEFAULT_SESSION_TYPE)
34 self.providers = self.create_children(gws.ext.object.authProvider, self.cfg('providers'))
36 sys_provider = self.create_child(system_provider.Object)
37 self.providers.append(sys_provider)
39 self.guestUser = sys_provider.get_user('guest')
40 self.systemUser = sys_provider.get_user('system')
42 self.methods = self.create_children(gws.ext.object.authMethod, self.cfg('methods'))
43 if not self.methods:
44 # if no methods configured, enable the Web method
45 self.methods.append(self.create_child(gws.ext.object.authMethod, type='web'))
47 self.mfAdapters = self.create_children(gws.ext.object.authMultiFactorAdapter, self.cfg('mfa'))
49 self.guestSession = session.Object(uid='guest_session', method=None, user=self.guestUser)
51 self.root.app.middlewareMgr.register(self, 'auth', depends_on=['db'])
53 ##
55 def enter_middleware(self, req: gws.WebRequester):
56 sess = self._try_open_session(req) or self.guestSession
57 req.set_session(sess)
58 gws.log.debug(f'session opened: user={req.session.user.uid!r} roles={req.session.user.roles}')
60 def _try_open_session(self, req):
61 for meth in self.methods:
62 if not self._can_use_in_context(req, meth):
63 gws.log.warning(f'open_session: {meth=}: insecure_context, ignore')
64 continue
66 sess = meth.open_session(req)
67 if not sess:
68 continue
70 if not sess.user:
71 gws.log.warning(f'open_session: {meth=}: {sess.uid=} user not found')
72 self.sessionMgr.delete(sess)
73 return
75 if not sess.method or sess.method.uid != meth.uid:
76 gws.log.warning(f'open_session: {meth=}: {sess.uid=} wrong method {sess.method=}')
77 self.sessionMgr.delete(sess)
78 return
80 gws.log.debug(f'open_session: {meth=}: ok')
81 return sess
83 def _can_use_in_context(self, req: gws.WebRequester, meth: gws.AuthMethod):
84 if not meth.secure or req.isSecure:
85 return True
86 if not meth.allowInsecureFrom:
87 return False
88 ip = req.env('REMOTE_ADDR', '')
89 if ip not in meth.allowInsecureFrom:
90 return False
91 gws.log.warning(f'open_session: {meth=}: insecure_context allowed from {ip=}')
92 return True
94 def exit_middleware(self, req: gws.WebRequester, res: gws.WebResponder):
95 sess = req.session
96 if sess.method:
97 sess.method.close_session(req, res)
98 req.set_session(self.guestSession)
100 ##
102 def authenticate(self, method, credentials):
103 for prov in self.providers:
104 if prov.allowedMethods and method.extType not in prov.allowedMethods:
105 continue
106 gws.log.debug(f'trying provider {prov!r}')
107 user = prov.authenticate(method, credentials)
108 if user:
109 gws.log.debug(f'ok provider {prov!r}')
110 return user
112 ##
114 def get_user(self, user_uid):
115 provider_uid, local_uid = gws.u.split_uid(user_uid)
116 prov = self.get_provider(provider_uid)
117 return prov.get_user(local_uid) if prov else None
119 def get_provider(self, uid):
120 for obj in self.providers:
121 if obj.uid == uid:
122 return obj
124 def get_method(self, uid=None, ext_type=None):
125 for obj in self.methods:
126 if obj.uid == uid:
127 return obj
129 def get_multi_factor_adapter(self, uid=None, ext_type=None):
130 for obj in self.mfAdapters:
131 if obj.uid == uid:
132 return obj
134 def serialize_user(self, user):
135 return gws.lib.jsonx.to_string([user.authProvider.uid, user.authProvider.serialize_user(user)])
137 def unserialize_user(self, data):
138 provider_uid, ds = gws.lib.jsonx.from_string(data)
139 prov = self.get_provider(provider_uid)
140 return prov.unserialize_user(ds) if prov else None
142 def is_public_object(self, obj, *context):
143 return self.guestUser.can_read(obj, *context)