Coverage for gws-app / gws / plugin / auth_provider / ldap / __init__.py: 93%
196 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1"""LDAP authorization provider.
3Accepts an LDAP URL in the following form::
5 ldap://host:port/baseDN?searchAttribute
7which is a subset of the rfc2255 schema.
9Optionally, a bind DN and a password can be provided. This DN must have search permissions for the directory.
11The authorization workflow with the (login, password) credentials is as follows:
13- connect to the LDAP server, using the bind DN if provided
14- search for the DN matching ``searchAttribute = credentials.login``
15- attempt to login with that DN and ``credentials.password``
16- iterate the ``users`` configs to determine roles for the user
19References:
20 https://datatracker.ietf.org/doc/html/rfc2255
22"""
24from typing import Optional
26import contextlib
28import ldap
29import ldap.filter
31import gws
32import gws.base.auth
33import gws.lib.net
36gws.ext.new.authProvider('ldap')
39class UserSpec(gws.Data):
40 """Map LDAP filters to authorization roles"""
42 roles: list[str]
43 """GWS role names"""
44 matches: Optional[str]
45 """LDAP filter the account has to match"""
46 memberOf: Optional[str]
47 """LDAP group the account has to be a member of"""
50class SSLConfig(gws.Config):
51 """SSL configuration."""
53 ca: Optional[gws.FilePath]
54 """CA certificate location."""
55 crt: Optional[gws.FilePath]
56 """Client certificate location."""
57 key: Optional[gws.FilePath]
58 """Key location."""
61class Config(gws.base.auth.provider.Config):
62 """LDAP authorization provider"""
64 activeDirectory: bool = True
65 """True if the LDAP server is ActiveDirectory."""
66 bindDN: Optional[str]
67 """Bind DN."""
68 bindPassword: Optional[str]
69 """Bind password."""
70 displayNameFormat: Optional[gws.FormatStr]
71 """Format for user's display name."""
72 users: list[UserSpec]
73 """Map LDAP filters to gws roles."""
74 timeout: gws.Duration = '30'
75 """LDAP server timeout."""
76 url: str
77 """LDAP server url."""
78 ssl: Optional[SSLConfig]
79 """SSL configuration."""
82class Object(gws.base.auth.provider.Object):
83 serverUrl: str
84 baseDN: str
85 loginAttribute: str
86 timeout: int
87 ssl: Optional[SSLConfig]
88 activeDirectory: bool
89 bindDN: str
90 bindPassword: str
91 displayNameFormat: str
92 users: list[UserSpec]
94 def configure(self):
95 self.timeout = self.cfg('timeout', default=30)
97 self.activeDirectory = self.cfg('activeDirectory', default=True)
98 self.bindDN = self.cfg('bindDN', default='')
99 self.bindPassword = self.cfg('bindPassword', default='')
100 self.displayNameFormat = self.cfg('displayNameFormat', default='')
101 self.ssl = self.cfg('ssl')
102 self.users = self.cfg('users', default=[])
104 proto = 'ldaps' if self.ssl else 'ldap'
105 p = gws.lib.net.parse_url(self.cfg('url'))
107 self.serverUrl = proto + '://' + p.netloc
108 self.baseDN = p.path.strip('/')
109 self.loginAttribute = p.query
111 try:
112 with self._connection():
113 gws.log.debug(f'LDAP connection {self.uid!r} ok')
114 except Exception as exc:
115 raise gws.Error(f'LDAP connection error: {exc.__class__.__name__}') from exc
117 def authenticate(self, method, credentials):
118 username = credentials.get('username', '').strip()
119 password = credentials.get('password', '').strip()
120 if not username or not password:
121 return
123 with self._connection() as conn:
124 rec = self._get_user_record(conn, username, password)
125 if not rec:
126 return
128 # NB need to rebind as admin
129 with self._connection() as conn:
130 return self._make_user(conn, rec)
132 def get_user(self, local_uid):
133 with self._connection() as conn:
134 users = self._find(conn, _make_filter({self.loginAttribute: local_uid}))
135 if len(users) == 1:
136 return self._make_user(conn, users[0])
138 ##
140 def _get_user_record(self, conn, username, password):
141 users = self._find(conn, _make_filter({self.loginAttribute: username}))
143 if len(users) == 0:
144 return
145 if len(users) > 1:
146 raise gws.ForbiddenError(f'multiple entries for {username!r}')
148 rec = users[0]
150 # check for AD disabled accounts
151 uac = str(rec.get('userAccountControl', ''))
152 if uac and uac.isdigit():
153 if int(uac) & _MS_ACCOUNTDISABLE:
154 raise gws.ForbiddenError('ACCOUNTDISABLE flag set')
156 try:
157 conn.simple_bind_s(rec['dn'], password)
158 return rec
159 except ldap.INVALID_CREDENTIALS:
160 raise gws.ForbiddenError(f'wrong password for {username!r}')
161 except ldap.LDAPError as exc:
162 gws.log.exception()
163 raise gws.ForbiddenError(f'LDAP error {exc.__class__.__name__}') from exc
165 def _make_user(self, conn, rec):
166 user_rec = dict(rec)
167 user_rec['roles'] = self._roles_for_user(conn, rec)
169 if not user_rec.get('displayName') and self.displayNameFormat:
170 user_rec['displayName'] = gws.u.format_map(self.displayNameFormat, rec)
172 login = user_rec.pop(self.loginAttribute, '')
173 user_rec['localUid'] = user_rec['loginName'] = login
175 return gws.base.auth.user.from_record(self, user_rec)
177 def _roles_for_user(self, conn, rec):
178 user_dn = rec['dn']
179 roles = set()
181 for u in self.users:
182 if u.get('matches'):
183 for dct in self._find(conn, u.matches):
184 if dct['dn'] == user_dn:
185 roles.update(u.roles)
187 if u.get('memberOf'):
188 for dct in self._find(conn, u.memberOf):
189 if _is_member_of(dct, user_dn):
190 roles.update(u.roles)
192 return sorted(roles)
194 def _find(self, conn, flt):
195 try:
196 res = conn.search_s(self.baseDN, ldap.SCOPE_SUBTREE, flt)
197 except ldap.NO_SUCH_OBJECT:
198 return []
200 dcts = []
202 for dn, data in res:
203 if dn:
204 d = _as_dict(data)
205 d['dn'] = dn
206 dcts.append(d)
208 return dcts
210 @contextlib.contextmanager
211 def _connection(self):
212 conn = ldap.initialize(self.serverUrl)
213 conn.set_option(ldap.OPT_NETWORK_TIMEOUT, self.timeout)
215 if self.ssl:
216 if self.root.app.developer_option('ldap.ssl_insecure'):
217 conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
218 else:
219 conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND)
220 if self.ssl.ca:
221 conn.set_option(ldap.OPT_X_TLS_CACERTFILE, self.ssl.ca)
222 if self.ssl.crt and self.ssl.key:
223 conn.set_option(ldap.OPT_X_TLS_CERTFILE, self.ssl.crt)
224 conn.set_option(ldap.OPT_X_TLS_KEYFILE, self.ssl.key)
225 conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0)
227 if self.activeDirectory:
228 # see https://www.python-ldap.org/faq.html#usage
229 conn.set_option(ldap.OPT_REFERRALS, 0)
231 if self.bindDN:
232 conn.simple_bind_s(self.bindDN, self.bindPassword)
234 try:
235 yield conn
236 finally:
237 conn.unbind_s()
240def _as_dict(data):
241 d = {}
243 for k, v in data.items():
244 if not v:
245 continue
246 if not isinstance(v, list):
247 v = [v]
248 v = [gws.u.to_str(s) for s in v]
249 d[k] = v[0] if len(v) == 1 else v
251 return d
254def _make_filter(filter_dict):
255 conds = ''.join(
256 '({}={})'.format(
257 ldap.filter.escape_filter_chars(k, 1),
258 ldap.filter.escape_filter_chars(v, 1),
259 )
260 for k, v in filter_dict.items()
261 )
262 return '(&' + conds + ')'
265def _is_member_of(group_dict, user_dn):
266 for key in 'member', 'members', 'uniqueMember':
267 if key in group_dict and user_dn in group_dict[key]:
268 return True
271# https://support.microsoft.com/en-us/help/305144
273_MS_SCRIPT = 0x0001
274_MS_ACCOUNTDISABLE = 0x0002
275_MS_HOMEDIR_REQUIRED = 0x0008
276_MS_LOCKOUT = 0x0010
277_MS_PASSWD_NOTREQD = 0x0020
278_MS_PASSWD_CANT_CHANGE = 0x0040
279_MS_ENCRYPTED_TEXT_PWD_ALLOWED = 0x0080
280_MS_TEMP_DUPLICATE_ACCOUNT = 0x0100
281_MS_NORMAL_ACCOUNT = 0x0200
282_MS_INTERDOMAIN_TRUST_ACCOUNT = 0x0800
283_MS_WORKSTATION_TRUST_ACCOUNT = 0x1000
284_MS_SERVER_TRUST_ACCOUNT = 0x2000
285_MS_DONT_EXPIRE_PASSWORD = 0x10000
286_MS_MNS_LOGON_ACCOUNT = 0x20000
287_MS_SMARTCARD_REQUIRED = 0x40000
288_MS_TRUSTED_FOR_DELEGATION = 0x80000
289_MS_NOT_DELEGATED = 0x100000
290_MS_USE_DES_KEY_ONLY = 0x200000
291_MS_DONT_REQ_PREAUTH = 0x400000
292_MS_PASSWORD_EXPIRED = 0x800000
293_MS_TRUSTED_TO_AUTH_FOR_DELEGATION = 0x1000000
294_MS_PARTIAL_SECRETS_ACCOUNT = 0x04000000