Coverage for gws-app/gws/plugin/account/helper.py: 0%

211 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1from typing import Optional, cast 

2 

3import gws 

4import gws.base.edit.helper 

5import gws.config.util 

6import gws.plugin.email_helper 

7import gws.lib.image 

8import gws.lib.net 

9import gws.lib.otp 

10 

11from . import core 

12 

13gws.ext.new.helper('account') 

14 

15 

16class MfaConfig: 

17 """Multi-factor authentication configuration.""" 

18 

19 mfaUid: str 

20 """UID of the multi-factor authentication adapter.""" 

21 title: str 

22 """Title of the multi-factor authentication method.""" 

23 

24 

25class Config(gws.Config): 

26 """Account helper.""" 

27 

28 adminModel: gws.ext.config.model 

29 """Edit model for account administration.""" 

30 userModel: Optional[gws.ext.config.model] 

31 """Edit model for end-users accounts.""" 

32 templates: list[gws.ext.config.template] 

33 """Templates""" 

34 

35 usernameColumn: str = 'email' 

36 """Column used as 'login'.""" 

37 

38 passwordCreateSql: Optional[str] 

39 """SQL expression for computing password hashes.""" 

40 passwordVerifySql: Optional[str] 

41 """SQL expression for verifying password hashes.""" 

42 

43 tcLifeTime: gws.Duration = '3600' 

44 """Life time for temporary codes.""" 

45 

46 mfa: Optional[list[MfaConfig]] 

47 """Multi-factor authentication methods the user can choose from.""" 

48 mfaIssuer: str = '' 

49 """Issuer name for Multi-factor key uris (qr codes).""" 

50 

51 onboardingUrl: str 

52 """URL for email onboarding.""" 

53 onboardingCompletionUrl: str = '' 

54 """URL to redirect after onboarding.""" 

55 

56 

57## 

58 

59 

60class Error(gws.Error): 

61 """Account-related error.""" 

62 

63 pass 

64 

65 

66## 

67 

68 

69class MfaOption(gws.Data): 

70 index: int 

71 title: str 

72 adapter: Optional[gws.AuthMultiFactorAdapter] 

73 

74 

75_DEFAULT_PASSWORD_CREATE_SQL = "crypt( {password}, gen_salt('bf') )" 

76_DEFAULT_PASSWORD_VERIFY_SQL = 'crypt( {password}, {passwordColumn} )' 

77 

78 

79class Object(gws.base.edit.helper.Object): 

80 adminModel: gws.DatabaseModel 

81 userModel: gws.DatabaseModel 

82 templates: list[gws.Template] 

83 

84 mfaIssuer: str 

85 mfaOptions: list[MfaOption] 

86 onboardingUrl: str 

87 onboardingCompletionUrl: str 

88 passwordCreateSql: str 

89 passwordVerifySql: str 

90 tcLifeTime: int 

91 usernameColumn: str 

92 

93 def configure(self): 

94 self.configure_templates() 

95 

96 self.adminModel = cast(gws.DatabaseModel, self.create_child(gws.ext.object.model, self.cfg('adminModel'))) 

97 

98 self.mfaIssuer = self.cfg('mfaIssuer') 

99 self.mfaOptions = [] 

100 

101 self.onboardingUrl = self.cfg('onboardingUrl') 

102 self.onboardingCompletionUrl = self.cfg('onboardingCompletionUrl') or self.onboardingUrl 

103 

104 self.passwordCreateSql = self.cfg('passwordCreateSql', default=_DEFAULT_PASSWORD_CREATE_SQL) 

105 self.passwordVerifySql = self.cfg('passwordVerifySql', default=_DEFAULT_PASSWORD_VERIFY_SQL) 

106 

107 self.tcLifeTime = self.cfg('tcLifeTime', default=3600) 

108 

109 self.usernameColumn = self.cfg('usernameColumn', default=core.Columns.email) 

110 

111 def configure_templates(self): 

112 return gws.config.util.configure_templates_for(self) 

113 

114 def post_configure(self): 

115 for n, c in enumerate(self.cfg('mfa', default=[]), 1): 

116 opt = MfaOption(index=n, title=c.title) 

117 if c.mfaUid: 

118 opt.adapter = self.root.get(c.mfaUid) 

119 if not opt.adapter: 

120 raise gws.ConfigurationError(f'MFA Adapter not found {c.mfaUid=}') 

121 self.mfaOptions.append(opt) 

122 

123 ## 

124 

125 def get_models(self, req, p): 

126 return [self.adminModel] 

127 

128 def write_feature(self, req, p): 

129 is_new = p.feature.isNew 

130 f = super().write_feature(req, p) 

131 

132 if f and not f.errors and is_new: 

133 account = self.get_account_by_id(f.uid()) 

134 self.reset(account) 

135 

136 return f 

137 

138 ## 

139 

140 def get_account_by_id(self, uid: str) -> Optional[dict]: 

141 sql = f""" 

142 SELECT * FROM {self.adminModel.tableName} 

143 WHERE {self.adminModel.uidName}=:uid 

144 """ 

145 rs = self.adminModel.db.select_text(sql, uid=uid) 

146 return rs[0] if rs else None 

147 

148 def get_account_by_credentials(self, credentials: gws.Data, expected_status: Optional[core.Status] = None) -> Optional[dict]: 

149 expr = self.passwordVerifySql 

150 expr = expr.replace('{password}', ':password') 

151 expr = expr.replace('{passwordColumn}', core.Columns.password) 

152 

153 username = credentials.get('username') 

154 password = credentials.get('password') 

155 

156 sql = f""" 

157 SELECT 

158 {self.adminModel.uidName}, 

159 ( {core.Columns.password} = {expr} ) AS validpassword, 

160 {core.Columns.status} 

161 FROM 

162 {self.adminModel.tableName} 

163 WHERE 

164 {self.usernameColumn} = :username 

165 """ 

166 rs = self.adminModel.db.select_text(sql, username=username, password=password) 

167 

168 if not rs: 

169 gws.log.warning(f'get_account_by_credentials: {username=} not found') 

170 return 

171 

172 if len(rs) > 1: 

173 raise Error(f'get_account_by_credentials: multiple entries for {username=}') 

174 

175 r = rs[0] 

176 

177 if not r.get('validpassword'): 

178 raise Error(f'get_account_by_credentials: {username=} wrong password') 

179 

180 if expected_status: 

181 status = r.get(core.Columns.status) 

182 if status != expected_status: 

183 raise Error(f'get_account_by_credentials: {username=} wrong {status=} {expected_status=}') 

184 

185 return self.get_account_by_id(self.get_uid(r)) 

186 

187 def get_account_by_tc(self, tc: str, category: str, expected_status: Optional[core.Status] = None) -> Optional[dict]: 

188 sql = f""" 

189 SELECT 

190 {self.adminModel.uidName}, 

191 {core.Columns.tcTime}, 

192 {core.Columns.tcCategory}, 

193 {core.Columns.status} 

194 FROM 

195 {self.adminModel.tableName} 

196 WHERE 

197 {core.Columns.tc} = :tc 

198 """ 

199 rs = self.adminModel.db.select_text(sql, tc=tc) 

200 

201 if not rs: 

202 gws.log.warning(f'get_account_by_tc: {tc=} not found') 

203 return 

204 

205 self.invalidate_tc(tc) 

206 

207 if len(rs) > 1: 

208 raise Error(f'get_account_by_tc: {tc=} multiple entries') 

209 

210 r = rs[0] 

211 

212 if r.get(core.Columns.tcCategory) != category: 

213 gws.log.warning(f'get_account_by_tc: {category=} {tc=} wrong category') 

214 return 

215 

216 if gws.u.stime() - r.get(core.Columns.tcTime, 0) > self.tcLifeTime: 

217 gws.log.warning(f'get_account_by_tc: {category=} {tc=} expired') 

218 return 

219 

220 if expected_status: 

221 status = r.get(core.Columns.status) 

222 if status != expected_status: 

223 gws.log.warning(f'get_account_by_tc: {category=} {tc=} wrong {status=} {expected_status=}') 

224 return 

225 

226 return self.get_account_by_id(self.get_uid(r)) 

227 

228 ## 

229 

230 def set_password(self, account: dict, password): 

231 expr = self.passwordCreateSql 

232 expr = expr.replace('{password}', ':password') 

233 expr = expr.replace('{passwordColumn}', core.Columns.password) 

234 

235 sql = f""" 

236 UPDATE {self.adminModel.tableName} 

237 SET 

238 {core.Columns.password} = {expr} 

239 WHERE 

240 {self.adminModel.uidName} = :uid 

241 """ 

242 self.adminModel.db.execute_text(sql, password=password, uid=self.get_uid(account)) 

243 

244 def validate_password(self, password: str) -> bool: 

245 if len(password.strip()) == 0: 

246 return False 

247 # @TODO password complexity validation 

248 return True 

249 

250 ## 

251 

252 def set_mfa(self, account: dict, mfa_option_index: int): 

253 mfa_uid = None 

254 

255 for mo in self.mfa_options(account): 

256 if mo.index == mfa_option_index: 

257 mfa_uid = mo.adapter.uid if mo.adapter else '' 

258 break 

259 

260 if mfa_uid is None: 

261 raise Error(f'{mfa_option_index=} not found') 

262 

263 sql = f""" 

264 UPDATE {self.adminModel.tableName} 

265 SET 

266 {core.Columns.mfaUid} = :mfa_uid 

267 WHERE 

268 {self.adminModel.uidName} = :uid 

269 """ 

270 self.adminModel.db.execute_text(sql, mfa_uid=mfa_uid, uid=self.get_uid(account)) 

271 

272 def mfa_options(self, account: dict) -> list[MfaOption]: 

273 # @TODO different options per account 

274 return self.mfaOptions 

275 

276 def generate_mfa_secret(self, account: dict) -> str: 

277 secret = gws.lib.otp.random_secret() 

278 

279 sql = f""" 

280 UPDATE {self.adminModel.tableName} 

281 SET 

282 {core.Columns.mfaSecret} = :secret 

283 WHERE 

284 {self.adminModel.uidName} = :uid 

285 """ 

286 self.adminModel.db.execute_text(sql, secret=secret, uid=self.get_uid(account)) 

287 

288 return secret 

289 

290 def qr_code_for_mfa(self, account: dict, mo: MfaOption, secret: str) -> str: 

291 if not mo.adapter: 

292 return '' 

293 url = mo.adapter.key_uri(secret, self.mfaIssuer, account.get(self.usernameColumn)) 

294 if not url: 

295 return '' 

296 return gws.lib.image.qr_code(url).to_data_url() 

297 

298 ## 

299 

300 def set_status(self, account: dict, status: core.Status): 

301 sql = f""" 

302 UPDATE {self.adminModel.tableName} 

303 SET 

304 {core.Columns.status} = :status 

305 WHERE 

306 {self.adminModel.uidName} = :uid 

307 """ 

308 self.adminModel.db.execute_text(sql, status=status, uid=self.get_uid(account)) 

309 

310 def reset(self, account: dict): 

311 sql = f""" 

312 UPDATE {self.adminModel.tableName} 

313 SET 

314 {core.Columns.status} = :status, 

315 {core.Columns.password} = '', 

316 {core.Columns.mfaSecret} = '' 

317 WHERE 

318 {self.adminModel.uidName} = :uid 

319 """ 

320 self.adminModel.db.execute_text(sql, status=core.Status.new, uid=self.get_uid(account)) 

321 

322 if self.onboardingUrl: 

323 self.send_onboarding_email(account) 

324 

325 ## 

326 

327 def send_onboarding_email(self, account: dict): 

328 tc = self.generate_tc(account, core.Category.onboarding) 

329 url = gws.lib.net.add_params(self.onboardingUrl, onboarding=tc) 

330 self.send_mail(account, core.Category.onboarding, {'url': url}) 

331 

332 def generate_tc(self, account: dict, category: str) -> str: 

333 tc = self.make_tc() 

334 

335 sql = f""" 

336 UPDATE {self.adminModel.tableName} 

337 SET 

338 {core.Columns.tc} = :tc, 

339 {core.Columns.tcTime} = :time, 

340 {core.Columns.tcCategory} = :category 

341 WHERE 

342 {self.adminModel.uidName} = :uid 

343 """ 

344 self.adminModel.db.execute_text(sql, tc=tc, time=gws.u.stime(), category=category, uid=self.get_uid(account)) 

345 

346 return tc 

347 

348 def clear_tc(self, account: dict): 

349 sql = f""" 

350 UPDATE {self.adminModel.tableName} 

351 SET 

352 {core.Columns.tc} = '', 

353 {core.Columns.tcTime} = 0, 

354 {core.Columns.tcCategory} = '' 

355 WHERE 

356 {self.adminModel.uidName} = :uid 

357 """ 

358 self.adminModel.db.execute_text(sql, uid=self.get_uid(account)) 

359 

360 def invalidate_tc(self, tc: str): 

361 sql = f""" 

362 UPDATE {self.adminModel.tableName} 

363 SET 

364 {core.Columns.tc} = '', 

365 {core.Columns.tcTime} = 0, 

366 {core.Columns.tcCategory} = '' 

367 WHERE 

368 {core.Columns.tc} = :tc 

369 """ 

370 self.adminModel.db.execute_text(sql, tc=tc) 

371 

372 ## 

373 

374 def get_uid(self, account: dict) -> str: 

375 return account.get(self.adminModel.uidName) 

376 

377 def make_tc(self): 

378 return gws.u.random_string(32) 

379 

380 def send_mail(self, account: dict, category: str, args: Optional[dict] = None): 

381 email = account.get(core.Columns.email) 

382 if not email: 

383 raise Error(f'account {self.get_uid(account)}: no email') 

384 

385 args = args or {} 

386 args['account'] = account 

387 

388 message = gws.plugin.email_helper.Message( 

389 subject=self.render_template(f'{category}.emailSubject', args), 

390 mailTo=email, 

391 text=self.render_template(f'{category}.emailBody', args, mime='text/plain'), 

392 html=self.render_template(f'{category}.emailBody', args, mime='text/html'), 

393 ) 

394 

395 email_helper = cast(gws.plugin.email_helper.Object, self.root.app.helper('email')) 

396 email_helper.send_mail(message) 

397 

398 def render_template(self, subject, args, mime=None): 

399 tpl = self.root.app.templateMgr.find_template(subject, where=[self], mime=mime) 

400 if tpl: 

401 res = tpl.render(gws.TemplateRenderInput(args=args)) 

402 return res.content 

403 return ''