Coverage for gws-app / gws / plugin / auth_session_manager / sqlite / __init__.py: 94%

64 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1from typing import Optional 

2 

3import gws 

4import gws.base.auth 

5import gws.lib.datetimex 

6import gws.lib.jsonx 

7import gws.lib.sqlitex 

8 

9gws.ext.new.authSessionManager('sqlite') 

10 

11 

12class Config(gws.base.auth.session_manager.Config): 

13 """Configuration for sqlite sessions""" 

14 

15 path: Optional[str] 

16 """Session storage path.""" 

17 

18 

19class Object(gws.base.auth.session_manager.Object): 

20 dbPath: str 

21 table = 'sessions' 

22 

23 def configure(self): 

24 ver = self.root.specs.version.rpartition('.')[0] 

25 self.dbPath = self.cfg('path', default=f'{gws.c.MISC_DIR}/sessions.{ver}.sqlite') 

26 

27 ## 

28 

29 def cleanup(self): 

30 last_time = gws.u.stime() - self.lifeTime 

31 self._db().execute(f'DELETE FROM {self.table} WHERE updated < :last_time', last_time=last_time) 

32 

33 def create(self, method, user, data=None): 

34 am = self.root.app.authMgr 

35 uid = gws.u.random_string(64) 

36 

37 self._db().insert(self.table, dict( 

38 uid=uid, 

39 method_uid=method.uid, 

40 user_uid=user.uid, 

41 str_user=am.serialize_user(user), 

42 str_data=gws.lib.jsonx.to_string(data or {}), 

43 created=gws.u.stime(), 

44 updated=gws.u.stime(), 

45 )) 

46 

47 return self.get(uid) 

48 

49 def delete(self, sess): 

50 self._db().execute(f'DELETE FROM {self.table} WHERE uid=:uid', uid=sess.uid) 

51 

52 def delete_all(self): 

53 self._db().execute(f'DELETE FROM {self.table}') 

54 

55 def get(self, uid): 

56 rs = self._db().select(f'SELECT * FROM {self.table} WHERE uid=:uid', uid=uid) 

57 if len(rs) == 1: 

58 return self._session(rs[0]) 

59 

60 def get_valid(self, uid): 

61 last_time = gws.u.stime() - self.lifeTime 

62 rs = self._db().select(f'SELECT * FROM {self.table} WHERE uid=:uid', uid=uid) 

63 if len(rs) == 1: 

64 rec = gws.u.to_dict(rs[0]) 

65 if rec['updated'] >= last_time: 

66 return self._session(rec) 

67 

68 def get_all(self): 

69 return [ 

70 self._session(rec) 

71 for rec in self._db().select(f'SELECT * FROM {self.table}') 

72 ] 

73 

74 def save(self, sess): 

75 if not sess.isChanged: 

76 return 

77 

78 self._db().execute( 

79 f'UPDATE {self.table} SET str_data=:str_data, updated=:updated WHERE uid=:uid', 

80 str_data=gws.lib.jsonx.to_string(sess.data or {}), 

81 updated=gws.u.stime(), 

82 uid=sess.uid 

83 ) 

84 

85 sess.isChanged = False 

86 

87 def touch(self, sess): 

88 if sess.isChanged: 

89 return self.save(sess) 

90 

91 self._db().execute( 

92 f'UPDATE {self.table} SET updated=:updated WHERE uid=:uid', 

93 updated=gws.u.stime(), 

94 uid=sess.uid 

95 ) 

96 

97 ## 

98 

99 def _session(self, rec): 

100 am = self.root.app.authMgr 

101 r = gws.u.to_dict(rec) 

102 usr = am.unserialize_user(r['str_user']) 

103 if not usr: 

104 gws.log.error(f'invalid user in session {r["uid"]!r}') 

105 usr = am.guestUser 

106 return gws.base.auth.session.Object( 

107 uid=r['uid'], 

108 method=am.get_method(r['method_uid']), 

109 user=usr, 

110 data=gws.lib.jsonx.from_string(r['str_data']), 

111 created=gws.lib.datetimex.from_timestamp(r['created']), 

112 updated=gws.lib.datetimex.from_timestamp(r['updated']), 

113 ) 

114 

115 ## 

116 

117 _sqlitex: gws.lib.sqlitex.Object 

118 

119 def _db(self): 

120 if getattr(self, '_sqlitex', None) is None: 

121 ddl = f''' 

122 CREATE TABLE IF NOT EXISTS {self.table} ( 

123 uid TEXT NOT NULL PRIMARY KEY, 

124 method_uid TEXT NOT NULL, 

125 user_uid TEXT NOT NULL, 

126 str_user TEXT NOT NULL, 

127 str_data TEXT NOT NULL, 

128 created INTEGER NOT NULL, 

129 updated INTEGER NOT NULL 

130 ) 

131 ''' 

132 self._sqlitex = gws.lib.sqlitex.Object(self.dbPath, ddl) 

133 return self._sqlitex