Coverage for gws-app/gws/base/auth/mfa.py: 75%

79 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1"""Generic multi-factor authentication adapter. 

2 

3Multi-factor authentication (handled in ``gws.plugin.auth_method.web.core`) 

4is used for ``User`` object that provide the attribute ``mfaUid``, 

5which is supposed to be an ID of a configured MFA Adapter. 

6 

7Specific MFA Adapters can require other attributes. 

8 

9Multi-factor authentication starts by creating a `gws.AuthMultiFactorTransaction` object, 

10kept in a session until it is verified or expires. 

11 

12Some Adapters can be restarted (e.g. by resending a verification email). 

13""" 

14 

15from typing import Optional 

16 

17import gws 

18import gws.lib.otp 

19 

20 

21class OtpConfig: 

22 """OTP generation options.""" 

23 

24 start: Optional[int] 

25 """Start time for TOTP.""" 

26 step: Optional[int] 

27 """Step time for TOTP.""" 

28 length: Optional[int] 

29 """Length of the OTP code.""" 

30 tolerance: Optional[int] 

31 """Tolerance window for TOTP verification.""" 

32 algo: Optional[str] 

33 """Hash algorithm for OTP generation.""" 

34 

35 

36class Config(gws.Config): 

37 """Multi-factor authorization configuration.""" 

38 

39 message: str = '' 

40 """Message to display in the client.""" 

41 lifeTime: Optional[gws.Duration] = '120' 

42 """How long to wait for the MFA to complete.""" 

43 maxVerifyAttempts: int = 3 

44 """Max verify attempts.""" 

45 maxRestarts: int = 0 

46 """Max code regeneration attempts.""" 

47 otp: Optional[OtpConfig] 

48 """OTP generation options""" 

49 

50 

51class Object(gws.AuthMultiFactorAdapter): 

52 otpOptions: gws.lib.otp.Options 

53 

54 def configure(self): 

55 self.message = self.cfg('message', default='') 

56 self.lifeTime = self.cfg('lifeTime', default=120) 

57 self.maxVerifyAttempts = self.cfg('maxVerifyAttempts', default=3) 

58 self.maxRestarts = self.cfg('maxRestarts', default=0) 

59 self.otpOptions = gws.u.merge(gws.lib.otp.DEFAULTS, self.cfg('otp')) 

60 

61 def start(self, user): 

62 return gws.AuthMultiFactorTransaction( 

63 state=gws.AuthMultiFactorState.open, 

64 restartCount=0, 

65 verifyCount=0, 

66 secret='', 

67 startTime=self.current_timestamp(), 

68 generateTime=0, 

69 message=self.message, 

70 adapter=self, 

71 user=user, 

72 ) 

73 

74 def check_state(self, mfa): 

75 ts = self.current_timestamp() 

76 if ts - mfa.startTime >= self.lifeTime: 

77 mfa.state = gws.AuthMultiFactorState.failed 

78 return False 

79 if mfa.verifyCount > self.maxVerifyAttempts: 

80 mfa.state = gws.AuthMultiFactorState.failed 

81 return False 

82 if mfa.state == gws.AuthMultiFactorState.failed: 

83 return False 

84 return True 

85 

86 def check_restart(self, mfa): 

87 return mfa.restartCount < self.maxRestarts 

88 

89 def restart(self, mfa): 

90 rc = mfa.restartCount + 1 

91 if rc > self.maxRestarts: 

92 return 

93 

94 mfa = self.start(mfa.user) 

95 if not mfa: 

96 return 

97 

98 mfa.restartCount = rc 

99 return mfa 

100 

101 ## 

102 

103 def verify_attempt(self, mfa, payload_valid: bool): 

104 mfa.verifyCount += 1 

105 

106 if not self.check_state(mfa): 

107 return mfa 

108 

109 if payload_valid: 

110 mfa.state = gws.AuthMultiFactorState.ok 

111 return mfa 

112 

113 if mfa.verifyCount >= self.maxVerifyAttempts: 

114 mfa.state = gws.AuthMultiFactorState.failed 

115 return mfa 

116 

117 mfa.state = gws.AuthMultiFactorState.retry 

118 return mfa 

119 

120 def generate_totp(self, mfa: gws.AuthMultiFactorTransaction) -> str: 

121 ts = self.current_timestamp() 

122 totp = gws.lib.otp.new_totp(mfa.secret, ts, self.otpOptions) 

123 mfa.generateTime = ts 

124 gws.log.debug(f'generate_totp {ts=} {totp=} {mfa.generateTime=}') 

125 return totp 

126 

127 def check_totp(self, mfa: gws.AuthMultiFactorTransaction, input: str) -> bool: 

128 return gws.lib.otp.check_totp( 

129 str(input or ''), 

130 mfa.secret, 

131 self.current_timestamp(), 

132 self.otpOptions, 

133 ) 

134 

135 def current_timestamp(self): 

136 return gws.u.stime()