Coverage for gws-app/gws/plugin/account/helper.py: 0%
211 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
1from typing import Optional, cast
3import gws
4import gws.base.edit.helper
5import gws.config.util
6import gws.plugin.email_helper
7import gws.lib.image
8import gws.lib.net
9import gws.lib.otp
11from . import core
13gws.ext.new.helper('account')
16class MfaConfig:
17 """Multi-factor authentication configuration."""
19 mfaUid: str
20 """UID of the multi-factor authentication adapter."""
21 title: str
22 """Title of the multi-factor authentication method."""
25class Config(gws.Config):
26 """Account helper."""
28 adminModel: gws.ext.config.model
29 """Edit model for account administration."""
30 userModel: Optional[gws.ext.config.model]
31 """Edit model for end-users accounts."""
32 templates: list[gws.ext.config.template]
33 """Templates"""
35 usernameColumn: str = 'email'
36 """Column used as 'login'."""
38 passwordCreateSql: Optional[str]
39 """SQL expression for computing password hashes."""
40 passwordVerifySql: Optional[str]
41 """SQL expression for verifying password hashes."""
43 tcLifeTime: gws.Duration = '3600'
44 """Life time for temporary codes."""
46 mfa: Optional[list[MfaConfig]]
47 """Multi-factor authentication methods the user can choose from."""
48 mfaIssuer: str = ''
49 """Issuer name for Multi-factor key uris (qr codes)."""
51 onboardingUrl: str
52 """URL for email onboarding."""
53 onboardingCompletionUrl: str = ''
54 """URL to redirect after onboarding."""
57##
60class Error(gws.Error):
61 """Account-related error."""
63 pass
66##
69class MfaOption(gws.Data):
70 index: int
71 title: str
72 adapter: Optional[gws.AuthMultiFactorAdapter]
75_DEFAULT_PASSWORD_CREATE_SQL = "crypt( {password}, gen_salt('bf') )"
76_DEFAULT_PASSWORD_VERIFY_SQL = 'crypt( {password}, {passwordColumn} )'
79class Object(gws.base.edit.helper.Object):
80 adminModel: gws.DatabaseModel
81 userModel: gws.DatabaseModel
82 templates: list[gws.Template]
84 mfaIssuer: str
85 mfaOptions: list[MfaOption]
86 onboardingUrl: str
87 onboardingCompletionUrl: str
88 passwordCreateSql: str
89 passwordVerifySql: str
90 tcLifeTime: int
91 usernameColumn: str
93 def configure(self):
94 self.configure_templates()
96 self.adminModel = cast(gws.DatabaseModel, self.create_child(gws.ext.object.model, self.cfg('adminModel')))
98 self.mfaIssuer = self.cfg('mfaIssuer')
99 self.mfaOptions = []
101 self.onboardingUrl = self.cfg('onboardingUrl')
102 self.onboardingCompletionUrl = self.cfg('onboardingCompletionUrl') or self.onboardingUrl
104 self.passwordCreateSql = self.cfg('passwordCreateSql', default=_DEFAULT_PASSWORD_CREATE_SQL)
105 self.passwordVerifySql = self.cfg('passwordVerifySql', default=_DEFAULT_PASSWORD_VERIFY_SQL)
107 self.tcLifeTime = self.cfg('tcLifeTime', default=3600)
109 self.usernameColumn = self.cfg('usernameColumn', default=core.Columns.email)
111 def configure_templates(self):
112 return gws.config.util.configure_templates_for(self)
114 def post_configure(self):
115 for n, c in enumerate(self.cfg('mfa', default=[]), 1):
116 opt = MfaOption(index=n, title=c.title)
117 if c.mfaUid:
118 opt.adapter = self.root.get(c.mfaUid)
119 if not opt.adapter:
120 raise gws.ConfigurationError(f'MFA Adapter not found {c.mfaUid=}')
121 self.mfaOptions.append(opt)
123 ##
125 def get_models(self, req, p):
126 return [self.adminModel]
128 def write_feature(self, req, p):
129 is_new = p.feature.isNew
130 f = super().write_feature(req, p)
132 if f and not f.errors and is_new:
133 account = self.get_account_by_id(f.uid())
134 self.reset(account)
136 return f
138 ##
140 def get_account_by_id(self, uid: str) -> Optional[dict]:
141 sql = f"""
142 SELECT * FROM {self.adminModel.tableName}
143 WHERE {self.adminModel.uidName}=:uid
144 """
145 rs = self.adminModel.db.select_text(sql, uid=uid)
146 return rs[0] if rs else None
148 def get_account_by_credentials(self, credentials: gws.Data, expected_status: Optional[core.Status] = None) -> Optional[dict]:
149 expr = self.passwordVerifySql
150 expr = expr.replace('{password}', ':password')
151 expr = expr.replace('{passwordColumn}', core.Columns.password)
153 username = credentials.get('username')
154 password = credentials.get('password')
156 sql = f"""
157 SELECT
158 {self.adminModel.uidName},
159 ( {core.Columns.password} = {expr} ) AS validpassword,
160 {core.Columns.status}
161 FROM
162 {self.adminModel.tableName}
163 WHERE
164 {self.usernameColumn} = :username
165 """
166 rs = self.adminModel.db.select_text(sql, username=username, password=password)
168 if not rs:
169 gws.log.warning(f'get_account_by_credentials: {username=} not found')
170 return
172 if len(rs) > 1:
173 raise Error(f'get_account_by_credentials: multiple entries for {username=}')
175 r = rs[0]
177 if not r.get('validpassword'):
178 raise Error(f'get_account_by_credentials: {username=} wrong password')
180 if expected_status:
181 status = r.get(core.Columns.status)
182 if status != expected_status:
183 raise Error(f'get_account_by_credentials: {username=} wrong {status=} {expected_status=}')
185 return self.get_account_by_id(self.get_uid(r))
187 def get_account_by_tc(self, tc: str, category: str, expected_status: Optional[core.Status] = None) -> Optional[dict]:
188 sql = f"""
189 SELECT
190 {self.adminModel.uidName},
191 {core.Columns.tcTime},
192 {core.Columns.tcCategory},
193 {core.Columns.status}
194 FROM
195 {self.adminModel.tableName}
196 WHERE
197 {core.Columns.tc} = :tc
198 """
199 rs = self.adminModel.db.select_text(sql, tc=tc)
201 if not rs:
202 gws.log.warning(f'get_account_by_tc: {tc=} not found')
203 return
205 self.invalidate_tc(tc)
207 if len(rs) > 1:
208 raise Error(f'get_account_by_tc: {tc=} multiple entries')
210 r = rs[0]
212 if r.get(core.Columns.tcCategory) != category:
213 gws.log.warning(f'get_account_by_tc: {category=} {tc=} wrong category')
214 return
216 if gws.u.stime() - r.get(core.Columns.tcTime, 0) > self.tcLifeTime:
217 gws.log.warning(f'get_account_by_tc: {category=} {tc=} expired')
218 return
220 if expected_status:
221 status = r.get(core.Columns.status)
222 if status != expected_status:
223 gws.log.warning(f'get_account_by_tc: {category=} {tc=} wrong {status=} {expected_status=}')
224 return
226 return self.get_account_by_id(self.get_uid(r))
228 ##
230 def set_password(self, account: dict, password):
231 expr = self.passwordCreateSql
232 expr = expr.replace('{password}', ':password')
233 expr = expr.replace('{passwordColumn}', core.Columns.password)
235 sql = f"""
236 UPDATE {self.adminModel.tableName}
237 SET
238 {core.Columns.password} = {expr}
239 WHERE
240 {self.adminModel.uidName} = :uid
241 """
242 self.adminModel.db.execute_text(sql, password=password, uid=self.get_uid(account))
244 def validate_password(self, password: str) -> bool:
245 if len(password.strip()) == 0:
246 return False
247 # @TODO password complexity validation
248 return True
250 ##
252 def set_mfa(self, account: dict, mfa_option_index: int):
253 mfa_uid = None
255 for mo in self.mfa_options(account):
256 if mo.index == mfa_option_index:
257 mfa_uid = mo.adapter.uid if mo.adapter else ''
258 break
260 if mfa_uid is None:
261 raise Error(f'{mfa_option_index=} not found')
263 sql = f"""
264 UPDATE {self.adminModel.tableName}
265 SET
266 {core.Columns.mfaUid} = :mfa_uid
267 WHERE
268 {self.adminModel.uidName} = :uid
269 """
270 self.adminModel.db.execute_text(sql, mfa_uid=mfa_uid, uid=self.get_uid(account))
272 def mfa_options(self, account: dict) -> list[MfaOption]:
273 # @TODO different options per account
274 return self.mfaOptions
276 def generate_mfa_secret(self, account: dict) -> str:
277 secret = gws.lib.otp.random_secret()
279 sql = f"""
280 UPDATE {self.adminModel.tableName}
281 SET
282 {core.Columns.mfaSecret} = :secret
283 WHERE
284 {self.adminModel.uidName} = :uid
285 """
286 self.adminModel.db.execute_text(sql, secret=secret, uid=self.get_uid(account))
288 return secret
290 def qr_code_for_mfa(self, account: dict, mo: MfaOption, secret: str) -> str:
291 if not mo.adapter:
292 return ''
293 url = mo.adapter.key_uri(secret, self.mfaIssuer, account.get(self.usernameColumn))
294 if not url:
295 return ''
296 return gws.lib.image.qr_code(url).to_data_url()
298 ##
300 def set_status(self, account: dict, status: core.Status):
301 sql = f"""
302 UPDATE {self.adminModel.tableName}
303 SET
304 {core.Columns.status} = :status
305 WHERE
306 {self.adminModel.uidName} = :uid
307 """
308 self.adminModel.db.execute_text(sql, status=status, uid=self.get_uid(account))
310 def reset(self, account: dict):
311 sql = f"""
312 UPDATE {self.adminModel.tableName}
313 SET
314 {core.Columns.status} = :status,
315 {core.Columns.password} = '',
316 {core.Columns.mfaSecret} = ''
317 WHERE
318 {self.adminModel.uidName} = :uid
319 """
320 self.adminModel.db.execute_text(sql, status=core.Status.new, uid=self.get_uid(account))
322 if self.onboardingUrl:
323 self.send_onboarding_email(account)
325 ##
327 def send_onboarding_email(self, account: dict):
328 tc = self.generate_tc(account, core.Category.onboarding)
329 url = gws.lib.net.add_params(self.onboardingUrl, onboarding=tc)
330 self.send_mail(account, core.Category.onboarding, {'url': url})
332 def generate_tc(self, account: dict, category: str) -> str:
333 tc = self.make_tc()
335 sql = f"""
336 UPDATE {self.adminModel.tableName}
337 SET
338 {core.Columns.tc} = :tc,
339 {core.Columns.tcTime} = :time,
340 {core.Columns.tcCategory} = :category
341 WHERE
342 {self.adminModel.uidName} = :uid
343 """
344 self.adminModel.db.execute_text(sql, tc=tc, time=gws.u.stime(), category=category, uid=self.get_uid(account))
346 return tc
348 def clear_tc(self, account: dict):
349 sql = f"""
350 UPDATE {self.adminModel.tableName}
351 SET
352 {core.Columns.tc} = '',
353 {core.Columns.tcTime} = 0,
354 {core.Columns.tcCategory} = ''
355 WHERE
356 {self.adminModel.uidName} = :uid
357 """
358 self.adminModel.db.execute_text(sql, uid=self.get_uid(account))
360 def invalidate_tc(self, tc: str):
361 sql = f"""
362 UPDATE {self.adminModel.tableName}
363 SET
364 {core.Columns.tc} = '',
365 {core.Columns.tcTime} = 0,
366 {core.Columns.tcCategory} = ''
367 WHERE
368 {core.Columns.tc} = :tc
369 """
370 self.adminModel.db.execute_text(sql, tc=tc)
372 ##
374 def get_uid(self, account: dict) -> str:
375 return account.get(self.adminModel.uidName)
377 def make_tc(self):
378 return gws.u.random_string(32)
380 def send_mail(self, account: dict, category: str, args: Optional[dict] = None):
381 email = account.get(core.Columns.email)
382 if not email:
383 raise Error(f'account {self.get_uid(account)}: no email')
385 args = args or {}
386 args['account'] = account
388 message = gws.plugin.email_helper.Message(
389 subject=self.render_template(f'{category}.emailSubject', args),
390 mailTo=email,
391 text=self.render_template(f'{category}.emailBody', args, mime='text/plain'),
392 html=self.render_template(f'{category}.emailBody', args, mime='text/html'),
393 )
395 email_helper = cast(gws.plugin.email_helper.Object, self.root.app.helper('email'))
396 email_helper.send_mail(message)
398 def render_template(self, subject, args, mime=None):
399 tpl = self.root.app.templateMgr.find_template(subject, where=[self], mime=mime)
400 if tpl:
401 res = tpl.render(gws.TemplateRenderInput(args=args))
402 return res.content
403 return ''