Coverage for gws-app/gws/lib/sqlitex/__init__.py: 75%
61 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
1"""Convenience wrapper for the SA SQLite engine.
3This wrapper accepts a database path and optionally an "init" DDL statement.
4It executes queries given in a text form.
6Failed queries are repeated up to 3 times to work around transient errors, like the DB being locked.
8If the error message is "no such table", the wrapper runs the "init" DDL before repeating.
10"""
12from typing import Optional
14import gws
15import gws.lib.sa as sa
18class Error(gws.Error):
19 pass
22class Object:
23 saEngine: sa.Engine
25 def __init__(self, db_path: str, init_ddl: Optional[str] = ''):
26 self.dbPath = db_path
27 self.initDDL = init_ddl
29 def execute(self, stmt: str, **params):
30 """Execute a text DML statement and commit."""
32 self._exec2(False, stmt, params)
34 def select(self, stmt: str, **params) -> list[dict]:
35 """Execute a text select statement."""
37 return self._exec2(True, stmt, params)
39 def insert(self, table_name: str, rec: dict):
40 """Insert a new record (dict) into a table."""
42 keys = ','.join(rec)
43 vals = ','.join(':' + k for k in rec)
45 self._exec2(False, f'INSERT INTO {table_name} ({keys}) VALUES({vals})', rec)
47 def update(self, table_name: str, rec: dict, uid):
48 """Update a record (dict) in a table."""
50 vals = ','.join(f'{k}=:{k}' for k in rec)
51 self._exec2(False, f'UPDATE {table_name} SET {vals} WHERE uid=:uid', {'uid': uid, **rec})
53 def delete(self, table_name: str, uid):
54 """Delete a record by uid from a table."""
56 self._exec2(False, f'DELETE FROM {table_name} WHERE uid=:uid', {'uid': uid})
58 ##
60 _MAX_ERRORS = 3
61 _SLEEP_TIME = 0.1
63 def _exec2(self, is_select, stmt, params):
64 err_cnt = 0
66 while True:
67 sa_exc = None
69 try:
70 with self._engine().connect() as conn:
71 if is_select:
72 return [gws.u.to_dict(r) for r in conn.execute(sa.text(stmt), params)]
73 conn.execute(sa.text(stmt), params)
74 conn.commit()
75 return
76 except sa.Error as exc:
77 sa_exc = exc
79 # @TODO using strings for error checking, is there a better way?
81 if 'no such table' in str(sa_exc) and self.initDDL:
82 gws.log.warning(f'sqlitex: {self.dbPath}: error={sa_exc}, running init...')
83 try:
84 with self._engine().connect() as conn:
85 conn.execute(sa.text(self.initDDL))
86 conn.commit()
87 continue
88 except sa.Error as exc:
89 sa_exc = exc
91 if 'database is locked' in str(sa_exc):
92 gws.log.warning(f'sqlitex: {self.dbPath}: locked, waiting...')
93 gws.u.sleep(self._SLEEP_TIME)
94 continue
96 err_cnt += 1
97 if err_cnt < self._MAX_ERRORS:
98 gws.log.warning(f'sqlitex: {self.dbPath}: error={sa_exc}, waiting...')
99 gws.u.sleep(self._SLEEP_TIME)
100 continue
102 raise gws.Error(f'sqlitex: {self.dbPath}: fatal error') from sa_exc
104 def _engine(self):
105 if getattr(self, 'saEngine', None) is None:
106 self.saEngine = sa.create_engine(f'sqlite:///{self.dbPath}', poolclass=sa.NullPool, echo=False)
107 return self.saEngine