Coverage for gws-app/gws/lib/sqlitex/__init__.py: 75%

61 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 23:09 +0200

1"""Convenience wrapper for the SA SQLite engine. 

2 

3This wrapper accepts a database path and optionally an "init" DDL statement. 

4It executes queries given in a text form. 

5 

6Failed queries are repeated up to 3 times to work around transient errors, like the DB being locked. 

7 

8If the error message is "no such table", the wrapper runs the "init" DDL before repeating. 

9 

10""" 

11 

12from typing import Optional 

13 

14import gws 

15import gws.lib.sa as sa 

16 

17 

18class Error(gws.Error): 

19 pass 

20 

21 

22class Object: 

23 saEngine: sa.Engine 

24 

25 def __init__(self, db_path: str, init_ddl: Optional[str] = ''): 

26 self.dbPath = db_path 

27 self.initDDL = init_ddl 

28 

29 def execute(self, stmt: str, **params): 

30 """Execute a text DML statement and commit.""" 

31 

32 self._exec2(False, stmt, params) 

33 

34 def select(self, stmt: str, **params) -> list[dict]: 

35 """Execute a text select statement.""" 

36 

37 return self._exec2(True, stmt, params) 

38 

39 def insert(self, table_name: str, rec: dict): 

40 """Insert a new record (dict) into a table.""" 

41 

42 keys = ','.join(rec) 

43 vals = ','.join(':' + k for k in rec) 

44 

45 self._exec2(False, f'INSERT INTO {table_name} ({keys}) VALUES({vals})', rec) 

46 

47 def update(self, table_name: str, rec: dict, uid): 

48 """Update a record (dict) in a table.""" 

49 

50 vals = ','.join(f'{k}=:{k}' for k in rec) 

51 self._exec2(False, f'UPDATE {table_name} SET {vals} WHERE uid=:uid', {'uid': uid, **rec}) 

52 

53 def delete(self, table_name: str, uid): 

54 """Delete a record by uid from a table.""" 

55 

56 self._exec2(False, f'DELETE FROM {table_name} WHERE uid=:uid', {'uid': uid}) 

57 

58 ## 

59 

60 _MAX_ERRORS = 3 

61 _SLEEP_TIME = 0.1 

62 

63 def _exec2(self, is_select, stmt, params): 

64 err_cnt = 0 

65 

66 while True: 

67 sa_exc = None 

68 

69 try: 

70 with self._engine().connect() as conn: 

71 if is_select: 

72 return [gws.u.to_dict(r) for r in conn.execute(sa.text(stmt), params)] 

73 conn.execute(sa.text(stmt), params) 

74 conn.commit() 

75 return 

76 except sa.Error as exc: 

77 sa_exc = exc 

78 

79 # @TODO using strings for error checking, is there a better way? 

80 

81 if 'no such table' in str(sa_exc) and self.initDDL: 

82 gws.log.warning(f'sqlitex: {self.dbPath}: error={sa_exc}, running init...') 

83 try: 

84 with self._engine().connect() as conn: 

85 conn.execute(sa.text(self.initDDL)) 

86 conn.commit() 

87 continue 

88 except sa.Error as exc: 

89 sa_exc = exc 

90 

91 if 'database is locked' in str(sa_exc): 

92 gws.log.warning(f'sqlitex: {self.dbPath}: locked, waiting...') 

93 gws.u.sleep(self._SLEEP_TIME) 

94 continue 

95 

96 err_cnt += 1 

97 if err_cnt < self._MAX_ERRORS: 

98 gws.log.warning(f'sqlitex: {self.dbPath}: error={sa_exc}, waiting...') 

99 gws.u.sleep(self._SLEEP_TIME) 

100 continue 

101 

102 raise gws.Error(f'sqlitex: {self.dbPath}: fatal error') from sa_exc 

103 

104 def _engine(self): 

105 if getattr(self, 'saEngine', None) is None: 

106 self.saEngine = sa.create_engine(f'sqlite:///{self.dbPath}', poolclass=sa.NullPool, echo=False) 

107 return self.saEngine