Coverage for gws-app/gws/base/auth/user.py: 82%
142 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
1from typing import Optional, cast
3import gws
4import gws.lib.jsonx
7class Props(gws.Props):
8 displayName: str
9 attributes: dict
12_FIELDS = {
13 'authToken',
14 'displayName',
15 'email',
16 'localUid',
17 'loginName',
18 'mfaSecret',
19 'mfaUid',
20}
23class User(gws.User):
24 isGuest = False
26 def __init__(self, provider, roles):
27 super().__init__()
29 self.authProvider = provider
31 self.attributes = {}
32 self.data = {}
33 self.roles = roles
34 self.uid = ''
36 for f in _FIELDS:
37 setattr(self, f, '')
39 def props(self, user):
40 return Props(displayName=self.displayName, attributes=self.attributes)
42 def has_role(self, role):
43 return role in self.roles
45 def can_use(self, obj, *context):
46 return self.can(gws.Access.read, obj, *context)
48 def can_read(self, obj, *context):
49 return self.can(gws.Access.read, obj, *context)
51 def can_write(self, obj, *context):
52 return self.can(gws.Access.write, obj, *context)
54 def can_create(self, obj, *context):
55 return self.can(gws.Access.create, obj, *context)
57 def can_edit(self, obj, *context):
58 return (
59 self.can(gws.Access.write, obj, *context)
60 or self.can(gws.Access.create, obj, *context)
61 or self.can(gws.Access.delete, obj, *context)
62 )
64 def can_delete(self, obj, *context):
65 return self.can(gws.Access.delete, obj, *context)
67 def can(self, access, obj, *context):
68 ci = 0
69 clen = len(context)
71 while obj:
72 bit = self.acl_bit(access, obj)
73 if bit is not None:
74 return bit == gws.c.ALLOW
75 obj = context[ci] if ci < clen else getattr(obj, 'parent', None)
76 ci += 1
78 return False
80 def acl_bit(self, access, obj):
81 if obj is self and access == gws.Access.read:
82 return gws.c.ALLOW
83 acl = obj.permissions.get(access)
84 if acl:
85 for bit, role in acl:
86 if role in self.roles:
87 return bit
89 def require(self, uid=None, classref=None, access=None):
90 access = access or gws.Access.read
91 obj = self.authProvider.root.get(uid, classref)
92 if not obj:
93 raise gws.NotFoundError(f'required object {classref} {uid} not found')
94 if not self.can(access, obj):
95 raise gws.ForbiddenError(f'required object {classref} {uid} forbidden')
96 return obj
98 def acquire(self, uid=None, classref=None, access=None):
99 access = access or gws.Access.read
100 obj = self.authProvider.root.get(uid, classref)
101 if obj and self.can(access, obj):
102 return obj
104 def require_project(self, uid=None):
105 return cast(gws.Project, self.require(uid, gws.ext.object.project))
107 def require_layer(self, uid=None):
108 return cast(gws.Layer, self.require(uid, gws.ext.object.layer))
111class GuestUser(User):
112 isGuest = True
115class SystemUser(User):
116 def acl_bit(self, access, obj):
117 return gws.c.ALLOW
120class NobodyUser(User):
121 def acl_bit(self, access, obj):
122 return gws.c.DENY
125class AuthorizedUser(User):
126 pass
129class AdminUser(User):
130 def acl_bit(self, access, obj):
131 return gws.c.ALLOW
134##
137##
139def to_dict(usr) -> dict:
140 d = {}
142 d['attributes'] = usr.attributes or {}
143 d['data'] = usr.data or {}
144 d['roles'] = list(usr.roles)
145 d['uid'] = usr.uid
147 for f in _FIELDS:
148 d[f] = getattr(usr, f, '')
150 return d
153def from_dict(provider: gws.AuthProvider, d: dict) -> gws.User:
154 roles = set(d.get('roles', []))
156 if gws.c.ROLE_GUEST in roles:
157 return provider.root.app.authMgr.guestUser
159 if gws.c.ROLE_ADMIN in roles:
160 usr = AdminUser(provider, roles)
161 else:
162 usr = AuthorizedUser(provider, roles)
164 for f in _FIELDS:
165 setattr(usr, f, d.get(f, ''))
167 usr.attributes = d.get('attributes', {})
168 usr.data = d.get('data', {})
169 usr.roles = roles
170 usr.uid = gws.u.join_uid(provider.uid, usr.localUid)
172 return usr
175def from_record(provider: gws.AuthProvider, user_rec: dict) -> gws.User:
176 """Create a User from a raw record as returned from a provider.
178 A provider can return an arbitrary dict of values. Entries whose keys are
179 in the `_FIELDS` list (case-insensitively), are copied to the newly
180 created `User` object.
182 Entries ``roles`` and ``attributes`` are copied as well,
183 other entries are stored in the user's ``data`` dict.
184 """
186 data = dict(user_rec)
188 roles = set(gws.u.to_list(data.pop('roles', [])))
189 roles.add(gws.c.ROLE_ALL)
191 if gws.c.ROLE_GUEST in roles:
192 return provider.root.app.authMgr.guestUser
194 if gws.c.ROLE_ADMIN in roles:
195 usr = AdminUser(provider, roles)
196 else:
197 roles.add(gws.c.ROLE_USER)
198 usr = AuthorizedUser(provider, roles)
200 for f in _FIELDS:
201 if f in data:
202 setattr(usr, f, data.pop(f))
203 continue
204 if f.lower() in data:
205 setattr(usr, f, data.pop(f.lower()))
206 continue
208 usr.attributes = data.pop('attributes', {})
209 usr.data = _process_aliases(data)
211 if not usr.loginName and 'login' in usr.data:
212 usr.loginName = usr.data['login']
214 if not usr.email and 'email' in usr.data:
215 usr.email = usr.data['email']
217 usr.localUid = usr.localUid or usr.loginName
218 if not usr.localUid:
219 raise gws.Error(f'missing local uid for user')
221 usr.displayName = usr.displayName or usr.loginName
223 usr.uid = gws.u.join_uid(provider.uid, usr.localUid)
225 return usr
228_ALIASES = [
229 # https://tools.ietf.org/html/rfc4519
230 ('c', 'countryName'),
231 ('cn', 'commonName'),
232 ('dc', 'domainComponent'),
233 ('l', 'localityName'),
234 ('o', 'organizationName'),
235 ('ou', 'organizationalUnitName'),
236 ('sn', 'surname'),
237 ('st', 'stateOrProvinceName'),
238 ('street', 'streetAddress'),
240 # non-standard
241 ('login', 'userPrincipalName'),
242 ('mail', 'email'),
243]
246def _process_aliases(r):
247 for a, b in _ALIASES:
248 if a in r:
249 r[b] = r[a]
250 elif b in r:
251 r[a] = r[b]
252 return r