Coverage for gws-app / gws / lib / sqlitex / __init__.py: 81%
62 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1"""Convenience wrapper for the SA SQLite engine.
3This wrapper accepts a database path and optionally an "init" DDL statement.
4It executes queries given in a text form.
6Failed queries are repeated up to 3 times to work around transient errors, like the DB being locked.
8If the error message is "no such table", the wrapper runs the "init" DDL before repeating.
10"""
12from typing import Optional
14import gws
15import gws.lib.sa as sa
18class Error(gws.Error):
19 pass
22class Object:
23 saEngine: sa.Engine
25 def __init__(self, db_path: str, init_ddl: Optional[str] = '', connect_args: Optional[dict] = None):
26 self.dbPath = db_path
27 self.initDDL = init_ddl
28 self.connectArgs = connect_args or {}
30 def execute(self, stmt: str, **params):
31 """Execute a text DML statement and commit."""
33 self._exec2(False, stmt, params)
35 def select(self, stmt: str, **params) -> list[dict]:
36 """Execute a text select statement."""
38 return self._exec2(True, stmt, params)
40 def insert(self, table_name: str, rec: dict):
41 """Insert a new record (dict) into a table."""
43 keys = ','.join(rec)
44 vals = ','.join(':' + k for k in rec)
46 self._exec2(False, f'INSERT INTO {table_name} ({keys}) VALUES({vals})', rec)
48 def update(self, table_name: str, rec: dict, uid):
49 """Update a record (dict) in a table."""
51 vals = ','.join(f'{k}=:{k}' for k in rec)
52 self._exec2(False, f'UPDATE {table_name} SET {vals} WHERE uid=:uid', {'uid': uid, **rec})
54 def delete(self, table_name: str, uid):
55 """Delete a record by uid from a table."""
57 self._exec2(False, f'DELETE FROM {table_name} WHERE uid=:uid', {'uid': uid})
59 ##
61 _MAX_ERRORS = 3
62 _SLEEP_TIME = 0.1
64 def _exec2(self, is_select, stmt, params):
65 err_cnt = 0
67 while True:
68 sa_exc = None
70 try:
71 with self._engine().connect() as conn:
72 if is_select:
73 return [gws.u.to_dict(r) for r in conn.execute(sa.text(stmt), params)]
74 conn.execute(sa.text(stmt), params)
75 conn.commit()
76 return []
77 except sa.Error as exc:
78 sa_exc = exc
80 # @TODO using strings for error checking, is there a better way?
82 if 'no such table' in str(sa_exc) and self.initDDL:
83 gws.log.warning(f'sqlitex: {self.dbPath}: {sa_exc}, running init...')
84 try:
85 with self._engine().connect() as conn:
86 conn.execute(sa.text(self.initDDL))
87 conn.commit()
88 continue
89 except sa.Error as exc:
90 sa_exc = exc
92 if 'database is locked' in str(sa_exc):
93 gws.log.warning(f'sqlitex: {self.dbPath}: locked, waiting...')
94 gws.u.sleep(self._SLEEP_TIME)
95 continue
97 err_cnt += 1
98 if err_cnt < self._MAX_ERRORS:
99 gws.log.warning(f'sqlitex: {self.dbPath}: {sa_exc}, waiting...')
100 gws.u.sleep(self._SLEEP_TIME)
101 continue
103 raise gws.Error(f'sqlitex: {self.dbPath}: {sa_exc}') from sa_exc
105 def _engine(self):
106 if getattr(self, 'saEngine', None) is None:
107 self.saEngine = sa.create_engine(
108 f'sqlite:///{self.dbPath}',
109 poolclass=sa.NullPool,
110 echo=False,
111 connect_args=self.connectArgs,
112 )
113 return self.saEngine