Coverage for gws-app / gws / lib / sqlitex / _test.py: 98%

161 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1"""Tests for the sqlitex module.""" 

2 

3import multiprocessing 

4import gws.lib.sqlitex as sqlitex 

5 

6 

7def test_basic_insert_and_select(tmp_path): 

8 """Test basic insert and select operations.""" 

9 db_path = tmp_path / 'test.db' 

10 

11 init_ddl = """ 

12 CREATE TABLE users ( 

13 uid INTEGER PRIMARY KEY, 

14 name TEXT, 

15 email TEXT 

16 ) 

17 """ 

18 

19 db = sqlitex.Object(str(db_path), init_ddl) 

20 

21 # Insert a record 

22 db.insert('users', {'uid': 1, 'name': 'Alice', 'email': 'alice@example.com'}) 

23 

24 # Select and verify 

25 results = db.select('SELECT * FROM users WHERE uid = :uid', uid=1) 

26 assert len(results) == 1 

27 assert results[0]['uid'] == 1 

28 assert results[0]['name'] == 'Alice' 

29 assert results[0]['email'] == 'alice@example.com' 

30 

31 

32def test_update_operation(tmp_path): 

33 """Test update operation.""" 

34 db_path = tmp_path / 'test.db' 

35 

36 init_ddl = """ 

37 CREATE TABLE users ( 

38 uid INTEGER PRIMARY KEY, 

39 name TEXT, 

40 email TEXT 

41 ) 

42 """ 

43 

44 db = sqlitex.Object(str(db_path), init_ddl) 

45 

46 # Insert a record 

47 db.insert('users', {'uid': 1, 'name': 'Bob', 'email': 'bob@example.com'}) 

48 

49 # Update the record 

50 db.update('users', {'name': 'Robert', 'email': 'robert@example.com'}, uid=1) 

51 

52 # Verify update 

53 results = db.select('SELECT * FROM users WHERE uid = :uid', uid=1) 

54 assert len(results) == 1 

55 assert results[0]['name'] == 'Robert' 

56 assert results[0]['email'] == 'robert@example.com' 

57 

58 

59def test_delete_operation(tmp_path): 

60 """Test delete operation.""" 

61 db_path = tmp_path / 'test.db' 

62 

63 init_ddl = """ 

64 CREATE TABLE users ( 

65 uid INTEGER PRIMARY KEY, 

66 name TEXT 

67 ) 

68 """ 

69 

70 db = sqlitex.Object(str(db_path), init_ddl) 

71 

72 # Insert records 

73 db.insert('users', {'uid': 1, 'name': 'Charlie'}) 

74 db.insert('users', {'uid': 2, 'name': 'Diana'}) 

75 

76 # Delete one record 

77 db.delete('users', uid=1) 

78 

79 # Verify deletion 

80 results = db.select('SELECT * FROM users') 

81 assert len(results) == 1 

82 assert results[0]['uid'] == 2 

83 assert results[0]['name'] == 'Diana' 

84 

85 

86def test_execute_statement(tmp_path): 

87 """Test execute method for DML statements.""" 

88 db_path = tmp_path / 'test.db' 

89 

90 init_ddl = """ 

91 CREATE TABLE products ( 

92 uid INTEGER PRIMARY KEY, 

93 name TEXT, 

94 price REAL 

95 ) 

96 """ 

97 

98 db = sqlitex.Object(str(db_path), init_ddl) 

99 

100 # Use execute for insert 

101 db.execute('INSERT INTO products (uid, name, price) VALUES (:uid, :name, :price)', uid=1, name='Widget', price=9.99) 

102 

103 # Verify 

104 results = db.select('SELECT * FROM products WHERE uid = :uid', uid=1) 

105 assert len(results) == 1 

106 assert results[0]['name'] == 'Widget' 

107 assert results[0]['price'] == 9.99 

108 

109 

110def test_auto_init_on_missing_table(tmp_path): 

111 """Test that init_ddl runs automatically when table doesn't exist.""" 

112 db_path = tmp_path / 'test.db' 

113 

114 init_ddl = """ 

115 CREATE TABLE items ( 

116 uid INTEGER PRIMARY KEY, 

117 description TEXT 

118 ) 

119 """ 

120 

121 db = sqlitex.Object(str(db_path), init_ddl) 

122 

123 # First query should trigger init_ddl 

124 db.insert('items', {'uid': 1, 'description': 'Test item'}) 

125 

126 # Verify it worked 

127 results = db.select('SELECT * FROM items') 

128 assert len(results) == 1 

129 assert results[0]['description'] == 'Test item' 

130 

131 

132def test_multiple_inserts(tmp_path): 

133 """Test multiple insert operations.""" 

134 db_path = tmp_path / 'test.db' 

135 

136 init_ddl = """ 

137 CREATE TABLE records ( 

138 uid INTEGER PRIMARY KEY, 

139 value INTEGER 

140 ) 

141 """ 

142 

143 db = sqlitex.Object(str(db_path), init_ddl) 

144 

145 # Insert multiple records 

146 for i in range(1, 6): 

147 db.insert('records', {'uid': i, 'value': i * 10}) 

148 

149 # Verify all records 

150 results = db.select('SELECT * FROM records ORDER BY uid') 

151 assert len(results) == 5 

152 for i, record in enumerate(results, start=1): 

153 assert record['uid'] == i 

154 assert record['value'] == i * 10 

155 

156 

157def test_select_with_parameters(tmp_path): 

158 """Test select with various parameter bindings.""" 

159 db_path = tmp_path / 'test.db' 

160 

161 init_ddl = """ 

162 CREATE TABLE employees ( 

163 uid INTEGER PRIMARY KEY, 

164 name TEXT, 

165 department TEXT, 

166 salary REAL 

167 ) 

168 """ 

169 

170 db = sqlitex.Object(str(db_path), init_ddl) 

171 

172 # Insert test data 

173 db.insert('employees', {'uid': 1, 'name': 'Alice', 'department': 'Engineering', 'salary': 75000}) 

174 db.insert('employees', {'uid': 2, 'name': 'Bob', 'department': 'Engineering', 'salary': 80000}) 

175 db.insert('employees', {'uid': 3, 'name': 'Charlie', 'department': 'Sales', 'salary': 65000}) 

176 

177 # Test filtering by department 

178 results = db.select('SELECT * FROM employees WHERE department = :dept', dept='Engineering') 

179 assert len(results) == 2 

180 

181 # Test filtering by salary range 

182 results = db.select('SELECT * FROM employees WHERE salary > :min_salary', min_salary=70000) 

183 assert len(results) == 2 

184 

185 

186def test_empty_select(tmp_path): 

187 """Test select that returns no results.""" 

188 db_path = tmp_path / 'test.db' 

189 

190 init_ddl = """ 

191 CREATE TABLE data ( 

192 uid INTEGER PRIMARY KEY, 

193 value TEXT 

194 ) 

195 """ 

196 

197 db = sqlitex.Object(str(db_path), init_ddl) 

198 

199 # Select from empty table 

200 results = db.select('SELECT * FROM data') 

201 assert len(results) == 0 

202 assert isinstance(results, list) 

203 

204 

205def test_update_nonexistent_record(tmp_path): 

206 """Test updating a record that doesn't exist.""" 

207 db_path = tmp_path / 'test.db' 

208 

209 init_ddl = """ 

210 CREATE TABLE items ( 

211 uid INTEGER PRIMARY KEY, 

212 name TEXT 

213 ) 

214 """ 

215 

216 db = sqlitex.Object(str(db_path), init_ddl) 

217 

218 # Try to update non-existent record (should not raise error) 

219 db.update('items', {'name': 'Updated'}, uid=999) 

220 

221 # Verify nothing was changed 

222 results = db.select('SELECT * FROM items') 

223 assert len(results) == 0 

224 

225 

226def test_delete_nonexistent_record(tmp_path): 

227 """Test deleting a record that doesn't exist.""" 

228 db_path = tmp_path / 'test.db' 

229 

230 init_ddl = """ 

231 CREATE TABLE items ( 

232 uid INTEGER PRIMARY KEY, 

233 name TEXT 

234 ) 

235 """ 

236 

237 db = sqlitex.Object(str(db_path), init_ddl) 

238 

239 # Try to delete non-existent record (should not raise error) 

240 db.delete('items', uid=999) 

241 

242 # Verify table is still empty 

243 results = db.select('SELECT * FROM items') 

244 assert len(results) == 0 

245 

246 

247def test_without_init_ddl(tmp_path): 

248 """Test creating database without init_ddl.""" 

249 db_path = tmp_path / 'test.db' 

250 

251 # Create database without init_ddl 

252 db = sqlitex.Object(str(db_path)) 

253 

254 # Manually create table 

255 db.execute(""" 

256 CREATE TABLE IF NOT EXISTS simple ( 

257 uid INTEGER PRIMARY KEY, 

258 data TEXT 

259 ) 

260 """) 

261 

262 # Insert and verify 

263 db.insert('simple', {'uid': 1, 'data': 'test'}) 

264 results = db.select('SELECT * FROM simple') 

265 assert len(results) == 1 

266 

267 

268def test_complex_query(tmp_path): 

269 """Test more complex SQL queries.""" 

270 db_path = tmp_path / 'test.db' 

271 

272 init_ddl = """ 

273 CREATE TABLE orders ( 

274 uid INTEGER PRIMARY KEY, 

275 customer TEXT, 

276 amount REAL, 

277 status TEXT 

278 ) 

279 """ 

280 

281 db = sqlitex.Object(str(db_path), init_ddl) 

282 

283 # Insert test data 

284 db.insert('orders', {'uid': 1, 'customer': 'John', 'amount': 100.50, 'status': 'completed'}) 

285 db.insert('orders', {'uid': 2, 'customer': 'Jane', 'amount': 250.75, 'status': 'pending'}) 

286 db.insert('orders', {'uid': 3, 'customer': 'John', 'amount': 75.25, 'status': 'completed'}) 

287 

288 # Complex query with aggregation 

289 results = db.select( 

290 """ 

291 SELECT customer, SUM(amount) as total, COUNT(*) as order_count 

292 FROM orders 

293 WHERE status = :status 

294 GROUP BY customer 

295 ORDER BY total DESC 

296 """, 

297 status='completed', 

298 ) 

299 

300 assert len(results) == 1 

301 assert results[0]['customer'] == 'John' 

302 assert results[0]['total'] == 175.75 

303 assert results[0]['order_count'] == 2 

304 

305 

306def test_special_characters_in_data(tmp_path): 

307 """Test handling of special characters in data.""" 

308 db_path = tmp_path / 'test.db' 

309 

310 init_ddl = """ 

311 CREATE TABLE messages ( 

312 uid INTEGER PRIMARY KEY, 

313 content TEXT 

314 ) 

315 """ 

316 

317 db = sqlitex.Object(str(db_path), init_ddl) 

318 

319 # Insert data with special characters 

320 special_text = 'Hello \'world\' with "quotes" and \n newlines \t tabs' 

321 db.insert('messages', {'uid': 1, 'content': special_text}) 

322 

323 # Verify 

324 results = db.select('SELECT * FROM messages WHERE uid = :uid', uid=1) 

325 assert len(results) == 1 

326 assert results[0]['content'] == special_text 

327 

328 

329def test_null_values(tmp_path): 

330 """Test handling of NULL values.""" 

331 db_path = tmp_path / 'test.db' 

332 

333 init_ddl = """ 

334 CREATE TABLE optional_data ( 

335 uid INTEGER PRIMARY KEY, 

336 name TEXT, 

337 optional_field TEXT 

338 ) 

339 """ 

340 

341 db = sqlitex.Object(str(db_path), init_ddl) 

342 

343 # Insert record with NULL 

344 db.execute('INSERT INTO optional_data (uid, name, optional_field) VALUES (:uid, :name, :opt)', uid=1, name='Test', opt=None) 

345 

346 # Verify 

347 results = db.select('SELECT * FROM optional_data WHERE uid = :uid', uid=1) 

348 assert len(results) == 1 

349 assert results[0]['name'] == 'Test' 

350 assert results[0]['optional_field'] is None 

351 

352 

353def test_reuse_database(tmp_path): 

354 """Test that database persists and can be reopened.""" 

355 db_path = tmp_path / 'test.db' 

356 

357 init_ddl = """ 

358 CREATE TABLE IF NOT EXISTS persistent ( 

359 uid INTEGER PRIMARY KEY, 

360 value TEXT 

361 ) 

362 """ 

363 

364 # First connection 

365 db1 = sqlitex.Object(str(db_path), init_ddl) 

366 db1.insert('persistent', {'uid': 1, 'value': 'data1'}) 

367 

368 # Second connection to same database 

369 db2 = sqlitex.Object(str(db_path), init_ddl) 

370 results = db2.select('SELECT * FROM persistent') 

371 

372 assert len(results) == 1 

373 assert results[0]['value'] == 'data1' 

374 

375 

376def _mp_worker(n, db_path, num_loops): 

377 """Worker function for multiprocessing concurrency test.""" 

378 

379 db = sqlitex.Object(str(db_path), connect_args={'timeout': 0.0, 'isolation_level': None}) 

380 

381 for i in range(num_loops): 

382 db.execute( 

383 """ 

384 UPDATE counter SET  

385 value = value + 1,  

386 last_updated_by = :pid  

387 WHERE uid = 1 

388 """, 

389 pid=n, 

390 ) 

391 

392 

393def test_concurrency(tmp_path): 

394 """Test concurrent writes using multiprocessing (true parallelism).""" 

395 num_processes = 50 

396 num_loops = 10 

397 db_path = tmp_path / 'test.db' 

398 

399 init_ddl = """ 

400 CREATE TABLE IF NOT EXISTS counter ( 

401 uid INTEGER PRIMARY KEY, 

402 value INTEGER, 

403 last_updated_by INTEGER 

404 ) 

405 """ 

406 

407 db = sqlitex.Object(str(db_path), init_ddl) 

408 db.insert('counter', {'uid': 1, 'value': 0, 'last_updated_by': -1}) 

409 

410 ps = [] 

411 

412 for n in range(num_processes): 

413 p = multiprocessing.Process(target=_mp_worker, args=[n, db_path, num_loops]) 

414 ps.append(p) 

415 p.start() 

416 

417 for p in ps: 

418 p.join() 

419 

420 db = sqlitex.Object(str(db_path)) 

421 

422 results = db.select('SELECT value FROM counter WHERE uid = 1') 

423 expected_value = num_processes * num_loops 

424 assert results[0]['value'] == expected_value