Coverage for gws-app / gws / base / auth / manager.py: 85%

99 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1"""Authorization and session manager.""" 

2 

3from typing import Optional, cast 

4 

5import gws 

6import gws.config 

7import gws.lib.jsonx 

8 

9from . import session, system_provider 

10 

11 

12class Config(gws.Config): 

13 """Authentication and authorization options""" 

14 

15 methods: Optional[list[gws.ext.config.authMethod]] 

16 """Authorization methods.""" 

17 providers: Optional[list[gws.ext.config.authProvider]] 

18 """Authorization providers.""" 

19 mfa: Optional[list[gws.ext.config.authMultiFactorAdapter]] 

20 """Authorization providers.""" 

21 session: Optional[gws.ext.config.authSessionManager] 

22 """Session options.""" 

23 

24 

25_DEFAULT_SESSION_TYPE = 'sqlite' 

26 

27 

28class Object(gws.AuthManager): 

29 """Authorization manager.""" 

30 

31 def configure(self): 

32 self.sessionMgr = self.create_child(gws.ext.object.authSessionManager, self.cfg('session'), type=_DEFAULT_SESSION_TYPE) 

33 

34 self.providers = self.create_children(gws.ext.object.authProvider, self.cfg('providers')) 

35 

36 sys_provider = self.create_child(system_provider.Object) 

37 self.providers.append(sys_provider) 

38 

39 self.guestUser = sys_provider.get_user('guest') 

40 self.systemUser = sys_provider.get_user('system') 

41 

42 self.methods = self.create_children(gws.ext.object.authMethod, self.cfg('methods')) 

43 if not self.methods: 

44 # if no methods configured, enable the Web method 

45 self.methods.append(self.create_child(gws.ext.object.authMethod, type='web')) 

46 

47 self.mfAdapters = self.create_children(gws.ext.object.authMultiFactorAdapter, self.cfg('mfa')) 

48 

49 self.guestSession = session.Object(uid='guest_session', method=None, user=self.guestUser) 

50 

51 self.root.app.middlewareMgr.register(self, 'auth', depends_on=['db']) 

52 

53 ## 

54 

55 def enter_middleware(self, req: gws.WebRequester): 

56 sess = self._try_open_session(req) or self.guestSession 

57 req.set_session(sess) 

58 gws.log.debug(f'session opened: user={req.session.user.uid!r} roles={req.session.user.roles}') 

59 

60 def _try_open_session(self, req): 

61 for meth in self.methods: 

62 if not self._can_use_in_context(req, meth): 

63 gws.log.warning(f'open_session: {meth=}: insecure_context, ignore') 

64 continue 

65 

66 sess = meth.open_session(req) 

67 if not sess: 

68 continue 

69 

70 if not sess.user: 

71 gws.log.warning(f'open_session: {meth=}: {sess.uid=} user not found') 

72 self.sessionMgr.delete(sess) 

73 return 

74 

75 if not sess.method or sess.method.uid != meth.uid: 

76 gws.log.warning(f'open_session: {meth=}: {sess.uid=} wrong method {sess.method=}') 

77 self.sessionMgr.delete(sess) 

78 return 

79 

80 gws.log.debug(f'open_session: {meth=}: ok') 

81 return sess 

82 

83 def _can_use_in_context(self, req: gws.WebRequester, meth: gws.AuthMethod): 

84 if not meth.secure or req.isSecure: 

85 return True 

86 if not meth.allowInsecureFrom: 

87 return False 

88 ip = req.env('REMOTE_ADDR', '') 

89 if ip not in meth.allowInsecureFrom: 

90 return False 

91 gws.log.warning(f'open_session: {meth=}: insecure_context allowed from {ip=}') 

92 return True 

93 

94 def exit_middleware(self, req: gws.WebRequester, res: gws.WebResponder): 

95 sess = req.session 

96 if sess.method: 

97 sess.method.close_session(req, res) 

98 req.set_session(self.guestSession) 

99 

100 ## 

101 

102 def authenticate(self, method, credentials): 

103 for prov in self.providers: 

104 if prov.allowedMethods and method.extType not in prov.allowedMethods: 

105 continue 

106 gws.log.debug(f'trying provider {prov!r}') 

107 user = prov.authenticate(method, credentials) 

108 if user: 

109 gws.log.debug(f'ok provider {prov!r}') 

110 return user 

111 

112 ## 

113 

114 def get_user(self, user_uid): 

115 provider_uid, local_uid = gws.u.split_uid(user_uid) 

116 prov = self.get_provider(provider_uid) 

117 return prov.get_user(local_uid) if prov else None 

118 

119 def get_provider(self, uid): 

120 for obj in self.providers: 

121 if obj.uid == uid: 

122 return obj 

123 

124 def get_method(self, uid=None, ext_type=None): 

125 for obj in self.methods: 

126 if obj.uid == uid: 

127 return obj 

128 

129 def get_multi_factor_adapter(self, uid=None, ext_type=None): 

130 for obj in self.mfAdapters: 

131 if obj.uid == uid: 

132 return obj 

133 

134 def serialize_user(self, user): 

135 return gws.lib.jsonx.to_string([user.authProvider.uid, user.authProvider.serialize_user(user)]) 

136 

137 def unserialize_user(self, data): 

138 provider_uid, ds = gws.lib.jsonx.from_string(data) 

139 prov = self.get_provider(provider_uid) 

140 return prov.unserialize_user(ds) if prov else None 

141 

142 def is_public_object(self, obj, *context): 

143 return self.guestUser.can_read(obj, *context)