Coverage for gws-app / gws / plugin / auth_session_manager / sqlite / __init__.py: 94%
64 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1from typing import Optional
3import gws
4import gws.base.auth
5import gws.lib.datetimex
6import gws.lib.jsonx
7import gws.lib.sqlitex
9gws.ext.new.authSessionManager('sqlite')
12class Config(gws.base.auth.session_manager.Config):
13 """Configuration for sqlite sessions"""
15 path: Optional[str]
16 """Session storage path."""
19class Object(gws.base.auth.session_manager.Object):
20 dbPath: str
21 table = 'sessions'
23 def configure(self):
24 ver = self.root.specs.version.rpartition('.')[0]
25 self.dbPath = self.cfg('path', default=f'{gws.c.MISC_DIR}/sessions.{ver}.sqlite')
27 ##
29 def cleanup(self):
30 last_time = gws.u.stime() - self.lifeTime
31 self._db().execute(f'DELETE FROM {self.table} WHERE updated < :last_time', last_time=last_time)
33 def create(self, method, user, data=None):
34 am = self.root.app.authMgr
35 uid = gws.u.random_string(64)
37 self._db().insert(self.table, dict(
38 uid=uid,
39 method_uid=method.uid,
40 user_uid=user.uid,
41 str_user=am.serialize_user(user),
42 str_data=gws.lib.jsonx.to_string(data or {}),
43 created=gws.u.stime(),
44 updated=gws.u.stime(),
45 ))
47 return self.get(uid)
49 def delete(self, sess):
50 self._db().execute(f'DELETE FROM {self.table} WHERE uid=:uid', uid=sess.uid)
52 def delete_all(self):
53 self._db().execute(f'DELETE FROM {self.table}')
55 def get(self, uid):
56 rs = self._db().select(f'SELECT * FROM {self.table} WHERE uid=:uid', uid=uid)
57 if len(rs) == 1:
58 return self._session(rs[0])
60 def get_valid(self, uid):
61 last_time = gws.u.stime() - self.lifeTime
62 rs = self._db().select(f'SELECT * FROM {self.table} WHERE uid=:uid', uid=uid)
63 if len(rs) == 1:
64 rec = gws.u.to_dict(rs[0])
65 if rec['updated'] >= last_time:
66 return self._session(rec)
68 def get_all(self):
69 return [
70 self._session(rec)
71 for rec in self._db().select(f'SELECT * FROM {self.table}')
72 ]
74 def save(self, sess):
75 if not sess.isChanged:
76 return
78 self._db().execute(
79 f'UPDATE {self.table} SET str_data=:str_data, updated=:updated WHERE uid=:uid',
80 str_data=gws.lib.jsonx.to_string(sess.data or {}),
81 updated=gws.u.stime(),
82 uid=sess.uid
83 )
85 sess.isChanged = False
87 def touch(self, sess):
88 if sess.isChanged:
89 return self.save(sess)
91 self._db().execute(
92 f'UPDATE {self.table} SET updated=:updated WHERE uid=:uid',
93 updated=gws.u.stime(),
94 uid=sess.uid
95 )
97 ##
99 def _session(self, rec):
100 am = self.root.app.authMgr
101 r = gws.u.to_dict(rec)
102 usr = am.unserialize_user(r['str_user'])
103 if not usr:
104 gws.log.error(f'invalid user in session {r["uid"]!r}')
105 usr = am.guestUser
106 return gws.base.auth.session.Object(
107 uid=r['uid'],
108 method=am.get_method(r['method_uid']),
109 user=usr,
110 data=gws.lib.jsonx.from_string(r['str_data']),
111 created=gws.lib.datetimex.from_timestamp(r['created']),
112 updated=gws.lib.datetimex.from_timestamp(r['updated']),
113 )
115 ##
117 _sqlitex: gws.lib.sqlitex.Object
119 def _db(self):
120 if getattr(self, '_sqlitex', None) is None:
121 ddl = f'''
122 CREATE TABLE IF NOT EXISTS {self.table} (
123 uid TEXT NOT NULL PRIMARY KEY,
124 method_uid TEXT NOT NULL,
125 user_uid TEXT NOT NULL,
126 str_user TEXT NOT NULL,
127 str_data TEXT NOT NULL,
128 created INTEGER NOT NULL,
129 updated INTEGER NOT NULL
130 )
131 '''
132 self._sqlitex = gws.lib.sqlitex.Object(self.dbPath, ddl)
133 return self._sqlitex