Coverage for gws-app / gws / lib / sqlitex / __init__.py: 81%

62 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1"""Convenience wrapper for the SA SQLite engine. 

2 

3This wrapper accepts a database path and optionally an "init" DDL statement. 

4It executes queries given in a text form. 

5 

6Failed queries are repeated up to 3 times to work around transient errors, like the DB being locked. 

7 

8If the error message is "no such table", the wrapper runs the "init" DDL before repeating. 

9 

10""" 

11 

12from typing import Optional 

13 

14import gws 

15import gws.lib.sa as sa 

16 

17 

18class Error(gws.Error): 

19 pass 

20 

21 

22class Object: 

23 saEngine: sa.Engine 

24 

25 def __init__(self, db_path: str, init_ddl: Optional[str] = '', connect_args: Optional[dict] = None): 

26 self.dbPath = db_path 

27 self.initDDL = init_ddl 

28 self.connectArgs = connect_args or {} 

29 

30 def execute(self, stmt: str, **params): 

31 """Execute a text DML statement and commit.""" 

32 

33 self._exec2(False, stmt, params) 

34 

35 def select(self, stmt: str, **params) -> list[dict]: 

36 """Execute a text select statement.""" 

37 

38 return self._exec2(True, stmt, params) 

39 

40 def insert(self, table_name: str, rec: dict): 

41 """Insert a new record (dict) into a table.""" 

42 

43 keys = ','.join(rec) 

44 vals = ','.join(':' + k for k in rec) 

45 

46 self._exec2(False, f'INSERT INTO {table_name} ({keys}) VALUES({vals})', rec) 

47 

48 def update(self, table_name: str, rec: dict, uid): 

49 """Update a record (dict) in a table.""" 

50 

51 vals = ','.join(f'{k}=:{k}' for k in rec) 

52 self._exec2(False, f'UPDATE {table_name} SET {vals} WHERE uid=:uid', {'uid': uid, **rec}) 

53 

54 def delete(self, table_name: str, uid): 

55 """Delete a record by uid from a table.""" 

56 

57 self._exec2(False, f'DELETE FROM {table_name} WHERE uid=:uid', {'uid': uid}) 

58 

59 ## 

60 

61 _MAX_ERRORS = 3 

62 _SLEEP_TIME = 0.1 

63 

64 def _exec2(self, is_select, stmt, params): 

65 err_cnt = 0 

66 

67 while True: 

68 sa_exc = None 

69 

70 try: 

71 with self._engine().connect() as conn: 

72 if is_select: 

73 return [gws.u.to_dict(r) for r in conn.execute(sa.text(stmt), params)] 

74 conn.execute(sa.text(stmt), params) 

75 conn.commit() 

76 return [] 

77 except sa.Error as exc: 

78 sa_exc = exc 

79 

80 # @TODO using strings for error checking, is there a better way? 

81 

82 if 'no such table' in str(sa_exc) and self.initDDL: 

83 gws.log.warning(f'sqlitex: {self.dbPath}: {sa_exc}, running init...') 

84 try: 

85 with self._engine().connect() as conn: 

86 conn.execute(sa.text(self.initDDL)) 

87 conn.commit() 

88 continue 

89 except sa.Error as exc: 

90 sa_exc = exc 

91 

92 if 'database is locked' in str(sa_exc): 

93 gws.log.warning(f'sqlitex: {self.dbPath}: locked, waiting...') 

94 gws.u.sleep(self._SLEEP_TIME) 

95 continue 

96 

97 err_cnt += 1 

98 if err_cnt < self._MAX_ERRORS: 

99 gws.log.warning(f'sqlitex: {self.dbPath}: {sa_exc}, waiting...') 

100 gws.u.sleep(self._SLEEP_TIME) 

101 continue 

102 

103 raise gws.Error(f'sqlitex: {self.dbPath}: {sa_exc}') from sa_exc 

104 

105 def _engine(self): 

106 if getattr(self, 'saEngine', None) is None: 

107 self.saEngine = sa.create_engine( 

108 f'sqlite:///{self.dbPath}', 

109 poolclass=sa.NullPool, 

110 echo=False, 

111 connect_args=self.connectArgs, 

112 ) 

113 return self.saEngine