Coverage for gws-app/gws/plugin/auth_session_manager/sqlite/__init__.py: 93%

60 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 23:09 +0200

1from typing import Optional 

2 

3import gws 

4import gws.base.auth 

5import gws.lib.datetimex 

6import gws.lib.jsonx 

7import gws.lib.osx 

8import gws.lib.sqlitex 

9 

10gws.ext.new.authSessionManager('sqlite') 

11 

12 

13class Config(gws.base.auth.session_manager.Config): 

14 """Configuration for sqlite sessions""" 

15 

16 path: Optional[str] 

17 """Session storage path.""" 

18 

19 

20class Object(gws.base.auth.session_manager.Object): 

21 dbPath: str 

22 table = 'sessions' 

23 

24 def configure(self): 

25 self.dbPath = self.cfg('path', default=f'{gws.c.MISC_DIR}/sessions82.sqlite') 

26 

27 ## 

28 

29 def cleanup(self): 

30 last_time = gws.u.stime() - self.lifeTime 

31 self._db().execute(f'DELETE FROM {self.table} WHERE updated < :last_time', last_time=last_time) 

32 

33 def create(self, method, user, data=None): 

34 am = self.root.app.authMgr 

35 uid = gws.u.random_string(64) 

36 

37 self._db().insert(self.table, dict( 

38 uid=uid, 

39 method_uid=method.uid, 

40 user_uid=user.uid, 

41 str_user=am.serialize_user(user), 

42 str_data=gws.lib.jsonx.to_string(data or {}), 

43 created=gws.u.stime(), 

44 updated=gws.u.stime(), 

45 )) 

46 

47 return self.get(uid) 

48 

49 def delete(self, sess): 

50 self._db().execute(f'DELETE FROM {self.table} WHERE uid=:uid', uid=sess.uid) 

51 

52 def delete_all(self): 

53 self._db().execute(f'DELETE FROM {self.table}') 

54 

55 def get(self, uid): 

56 rs = self._db().select(f'SELECT * FROM {self.table} WHERE uid=:uid', uid=uid) 

57 if len(rs) == 1: 

58 return self._session(rs[0]) 

59 

60 def get_valid(self, uid): 

61 last_time = gws.u.stime() - self.lifeTime 

62 rs = self._db().select(f'SELECT * FROM {self.table} WHERE uid=:uid', uid=uid) 

63 if len(rs) == 1: 

64 rec = gws.u.to_dict(rs[0]) 

65 if rec['updated'] >= last_time: 

66 return self._session(rec) 

67 

68 def get_all(self): 

69 return [ 

70 self._session(rec) 

71 for rec in self._db().select(f'SELECT * FROM {self.table}') 

72 ] 

73 

74 def save(self, sess): 

75 if not sess.isChanged: 

76 return 

77 

78 self._db().execute( 

79 f'UPDATE {self.table} SET str_data=:str_data, updated=:updated WHERE uid=:uid', 

80 str_data=gws.lib.jsonx.to_string(sess.data or {}), 

81 updated=gws.u.stime(), 

82 uid=sess.uid 

83 ) 

84 

85 sess.isChanged = False 

86 

87 def touch(self, sess): 

88 if sess.isChanged: 

89 return self.save(sess) 

90 

91 self._db().execute( 

92 f'UPDATE {self.table} SET updated=:updated WHERE uid=:uid', 

93 updated=gws.u.stime(), 

94 uid=sess.uid 

95 ) 

96 

97 ## 

98 

99 def _session(self, rec): 

100 am = self.root.app.authMgr 

101 r = gws.u.to_dict(rec) 

102 return gws.base.auth.session.Object( 

103 uid=r['uid'], 

104 method=am.get_method(r['method_uid']), 

105 user=am.unserialize_user(r['str_user']), 

106 data=gws.lib.jsonx.from_string(r['str_data']), 

107 created=gws.lib.datetimex.from_timestamp(r['created']), 

108 updated=gws.lib.datetimex.from_timestamp(r['updated']), 

109 ) 

110 

111 ## 

112 

113 _sqlitex: gws.lib.sqlitex.Object 

114 

115 def _db(self): 

116 if getattr(self, '_sqlitex', None) is None: 

117 ddl = f''' 

118 CREATE TABLE IF NOT EXISTS {self.table} ( 

119 uid TEXT NOT NULL PRIMARY KEY, 

120 method_uid TEXT NOT NULL, 

121 user_uid TEXT NOT NULL, 

122 str_user TEXT NOT NULL, 

123 str_data TEXT NOT NULL, 

124 created INTEGER NOT NULL, 

125 updated INTEGER NOT NULL 

126 ) 

127 ''' 

128 self._sqlitex = gws.lib.sqlitex.Object(self.dbPath, ddl) 

129 return self._sqlitex