Coverage for gws-app / gws / plugin / auth_provider / ldap / __init__.py: 93%

196 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1"""LDAP authorization provider. 

2 

3Accepts an LDAP URL in the following form:: 

4 

5 ldap://host:port/baseDN?searchAttribute 

6 

7which is a subset of the rfc2255 schema. 

8 

9Optionally, a bind DN and a password can be provided. This DN must have search permissions for the directory. 

10 

11The authorization workflow with the (login, password) credentials is as follows: 

12 

13- connect to the LDAP server, using the bind DN if provided 

14- search for the DN matching ``searchAttribute = credentials.login`` 

15- attempt to login with that DN and ``credentials.password`` 

16- iterate the ``users`` configs to determine roles for the user 

17 

18 

19References: 

20 https://datatracker.ietf.org/doc/html/rfc2255 

21 

22""" 

23 

24from typing import Optional 

25 

26import contextlib 

27 

28import ldap 

29import ldap.filter 

30 

31import gws 

32import gws.base.auth 

33import gws.lib.net 

34 

35 

36gws.ext.new.authProvider('ldap') 

37 

38 

39class UserSpec(gws.Data): 

40 """Map LDAP filters to authorization roles""" 

41 

42 roles: list[str] 

43 """GWS role names""" 

44 matches: Optional[str] 

45 """LDAP filter the account has to match""" 

46 memberOf: Optional[str] 

47 """LDAP group the account has to be a member of""" 

48 

49 

50class SSLConfig(gws.Config): 

51 """SSL configuration.""" 

52 

53 ca: Optional[gws.FilePath] 

54 """CA certificate location.""" 

55 crt: Optional[gws.FilePath] 

56 """Client certificate location.""" 

57 key: Optional[gws.FilePath] 

58 """Key location.""" 

59 

60 

61class Config(gws.base.auth.provider.Config): 

62 """LDAP authorization provider""" 

63 

64 activeDirectory: bool = True 

65 """True if the LDAP server is ActiveDirectory.""" 

66 bindDN: Optional[str] 

67 """Bind DN.""" 

68 bindPassword: Optional[str] 

69 """Bind password.""" 

70 displayNameFormat: Optional[gws.FormatStr] 

71 """Format for user's display name.""" 

72 users: list[UserSpec] 

73 """Map LDAP filters to gws roles.""" 

74 timeout: gws.Duration = '30' 

75 """LDAP server timeout.""" 

76 url: str 

77 """LDAP server url.""" 

78 ssl: Optional[SSLConfig] 

79 """SSL configuration.""" 

80 

81 

82class Object(gws.base.auth.provider.Object): 

83 serverUrl: str 

84 baseDN: str 

85 loginAttribute: str 

86 timeout: int 

87 ssl: Optional[SSLConfig] 

88 activeDirectory: bool 

89 bindDN: str 

90 bindPassword: str 

91 displayNameFormat: str 

92 users: list[UserSpec] 

93 

94 def configure(self): 

95 self.timeout = self.cfg('timeout', default=30) 

96 

97 self.activeDirectory = self.cfg('activeDirectory', default=True) 

98 self.bindDN = self.cfg('bindDN', default='') 

99 self.bindPassword = self.cfg('bindPassword', default='') 

100 self.displayNameFormat = self.cfg('displayNameFormat', default='') 

101 self.ssl = self.cfg('ssl') 

102 self.users = self.cfg('users', default=[]) 

103 

104 proto = 'ldaps' if self.ssl else 'ldap' 

105 p = gws.lib.net.parse_url(self.cfg('url')) 

106 

107 self.serverUrl = proto + '://' + p.netloc 

108 self.baseDN = p.path.strip('/') 

109 self.loginAttribute = p.query 

110 

111 try: 

112 with self._connection(): 

113 gws.log.debug(f'LDAP connection {self.uid!r} ok') 

114 except Exception as exc: 

115 raise gws.Error(f'LDAP connection error: {exc.__class__.__name__}') from exc 

116 

117 def authenticate(self, method, credentials): 

118 username = credentials.get('username', '').strip() 

119 password = credentials.get('password', '').strip() 

120 if not username or not password: 

121 return 

122 

123 with self._connection() as conn: 

124 rec = self._get_user_record(conn, username, password) 

125 if not rec: 

126 return 

127 

128 # NB need to rebind as admin 

129 with self._connection() as conn: 

130 return self._make_user(conn, rec) 

131 

132 def get_user(self, local_uid): 

133 with self._connection() as conn: 

134 users = self._find(conn, _make_filter({self.loginAttribute: local_uid})) 

135 if len(users) == 1: 

136 return self._make_user(conn, users[0]) 

137 

138 ## 

139 

140 def _get_user_record(self, conn, username, password): 

141 users = self._find(conn, _make_filter({self.loginAttribute: username})) 

142 

143 if len(users) == 0: 

144 return 

145 if len(users) > 1: 

146 raise gws.ForbiddenError(f'multiple entries for {username!r}') 

147 

148 rec = users[0] 

149 

150 # check for AD disabled accounts 

151 uac = str(rec.get('userAccountControl', '')) 

152 if uac and uac.isdigit(): 

153 if int(uac) & _MS_ACCOUNTDISABLE: 

154 raise gws.ForbiddenError('ACCOUNTDISABLE flag set') 

155 

156 try: 

157 conn.simple_bind_s(rec['dn'], password) 

158 return rec 

159 except ldap.INVALID_CREDENTIALS: 

160 raise gws.ForbiddenError(f'wrong password for {username!r}') 

161 except ldap.LDAPError as exc: 

162 gws.log.exception() 

163 raise gws.ForbiddenError(f'LDAP error {exc.__class__.__name__}') from exc 

164 

165 def _make_user(self, conn, rec): 

166 user_rec = dict(rec) 

167 user_rec['roles'] = self._roles_for_user(conn, rec) 

168 

169 if not user_rec.get('displayName') and self.displayNameFormat: 

170 user_rec['displayName'] = gws.u.format_map(self.displayNameFormat, rec) 

171 

172 login = user_rec.pop(self.loginAttribute, '') 

173 user_rec['localUid'] = user_rec['loginName'] = login 

174 

175 return gws.base.auth.user.from_record(self, user_rec) 

176 

177 def _roles_for_user(self, conn, rec): 

178 user_dn = rec['dn'] 

179 roles = set() 

180 

181 for u in self.users: 

182 if u.get('matches'): 

183 for dct in self._find(conn, u.matches): 

184 if dct['dn'] == user_dn: 

185 roles.update(u.roles) 

186 

187 if u.get('memberOf'): 

188 for dct in self._find(conn, u.memberOf): 

189 if _is_member_of(dct, user_dn): 

190 roles.update(u.roles) 

191 

192 return sorted(roles) 

193 

194 def _find(self, conn, flt): 

195 try: 

196 res = conn.search_s(self.baseDN, ldap.SCOPE_SUBTREE, flt) 

197 except ldap.NO_SUCH_OBJECT: 

198 return [] 

199 

200 dcts = [] 

201 

202 for dn, data in res: 

203 if dn: 

204 d = _as_dict(data) 

205 d['dn'] = dn 

206 dcts.append(d) 

207 

208 return dcts 

209 

210 @contextlib.contextmanager 

211 def _connection(self): 

212 conn = ldap.initialize(self.serverUrl) 

213 conn.set_option(ldap.OPT_NETWORK_TIMEOUT, self.timeout) 

214 

215 if self.ssl: 

216 if self.root.app.developer_option('ldap.ssl_insecure'): 

217 conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) 

218 else: 

219 conn.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND) 

220 if self.ssl.ca: 

221 conn.set_option(ldap.OPT_X_TLS_CACERTFILE, self.ssl.ca) 

222 if self.ssl.crt and self.ssl.key: 

223 conn.set_option(ldap.OPT_X_TLS_CERTFILE, self.ssl.crt) 

224 conn.set_option(ldap.OPT_X_TLS_KEYFILE, self.ssl.key) 

225 conn.set_option(ldap.OPT_X_TLS_NEWCTX, 0) 

226 

227 if self.activeDirectory: 

228 # see https://www.python-ldap.org/faq.html#usage 

229 conn.set_option(ldap.OPT_REFERRALS, 0) 

230 

231 if self.bindDN: 

232 conn.simple_bind_s(self.bindDN, self.bindPassword) 

233 

234 try: 

235 yield conn 

236 finally: 

237 conn.unbind_s() 

238 

239 

240def _as_dict(data): 

241 d = {} 

242 

243 for k, v in data.items(): 

244 if not v: 

245 continue 

246 if not isinstance(v, list): 

247 v = [v] 

248 v = [gws.u.to_str(s) for s in v] 

249 d[k] = v[0] if len(v) == 1 else v 

250 

251 return d 

252 

253 

254def _make_filter(filter_dict): 

255 conds = ''.join( 

256 '({}={})'.format( 

257 ldap.filter.escape_filter_chars(k, 1), 

258 ldap.filter.escape_filter_chars(v, 1), 

259 ) 

260 for k, v in filter_dict.items() 

261 ) 

262 return '(&' + conds + ')' 

263 

264 

265def _is_member_of(group_dict, user_dn): 

266 for key in 'member', 'members', 'uniqueMember': 

267 if key in group_dict and user_dn in group_dict[key]: 

268 return True 

269 

270 

271# https://support.microsoft.com/en-us/help/305144 

272 

273_MS_SCRIPT = 0x0001 

274_MS_ACCOUNTDISABLE = 0x0002 

275_MS_HOMEDIR_REQUIRED = 0x0008 

276_MS_LOCKOUT = 0x0010 

277_MS_PASSWD_NOTREQD = 0x0020 

278_MS_PASSWD_CANT_CHANGE = 0x0040 

279_MS_ENCRYPTED_TEXT_PWD_ALLOWED = 0x0080 

280_MS_TEMP_DUPLICATE_ACCOUNT = 0x0100 

281_MS_NORMAL_ACCOUNT = 0x0200 

282_MS_INTERDOMAIN_TRUST_ACCOUNT = 0x0800 

283_MS_WORKSTATION_TRUST_ACCOUNT = 0x1000 

284_MS_SERVER_TRUST_ACCOUNT = 0x2000 

285_MS_DONT_EXPIRE_PASSWORD = 0x10000 

286_MS_MNS_LOGON_ACCOUNT = 0x20000 

287_MS_SMARTCARD_REQUIRED = 0x40000 

288_MS_TRUSTED_FOR_DELEGATION = 0x80000 

289_MS_NOT_DELEGATED = 0x100000 

290_MS_USE_DES_KEY_ONLY = 0x200000 

291_MS_DONT_REQ_PREAUTH = 0x400000 

292_MS_PASSWORD_EXPIRED = 0x800000 

293_MS_TRUSTED_TO_AUTH_FOR_DELEGATION = 0x1000000 

294_MS_PARTIAL_SECRETS_ACCOUNT = 0x04000000