Coverage for gws-app / gws / base / job / manager.py: 26%

122 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1"""Job manager.""" 

2 

3from typing import Optional 

4import sys 

5 

6import gws 

7import gws.lib.importer 

8import gws.lib.jsonx 

9import gws.lib.sqlitex 

10import gws.lib.datetimex as dtx 

11import gws.server.spool 

12 

13 

14class Object(gws.JobManager): 

15 TABLE = 'jobs' 

16 DDL = f""" 

17 CREATE TABLE IF NOT EXISTS {TABLE} ( 

18 uid TEXT NOT NULL PRIMARY KEY, 

19 userUid TEXT DEFAULT '', 

20 userStr TEXT DEFAULT '', 

21 worker TEXT DEFAULT '', 

22 state TEXT DEFAULT '', 

23 error TEXT DEFAULT '', 

24 numSteps INTEGER DEFAULT 0, 

25 step INTEGER DEFAULT 0, 

26 stepName TEXT DEFAULT '', 

27 payload TEXT DEFAULT '', 

28 created INTEGER DEFAULT 0, 

29 updated INTEGER DEFAULT 0 

30 ) 

31 """ 

32 

33 dbPath: str 

34 

35 def configure(self): 

36 ver = self.root.specs.version.rpartition('.')[0] 

37 self.dbPath = self.cfg('path', default=f'{gws.c.MISC_DIR}/jobs.{ver}.sqlite') 

38 

39 def create_job(self, worker, user, payload=None): 

40 job_uid = gws.u.random_string(64) 

41 gws.log.debug(f'JOB {job_uid}: creating: {worker=} {user.uid=}') 

42 

43 self._db().insert(self.TABLE, dict(uid=job_uid)) 

44 

45 self._write( 

46 job_uid, 

47 dict( 

48 uid=job_uid, 

49 userUid=user.uid, 

50 userStr=self.root.app.authMgr.serialize_user(user), 

51 worker=sys.modules.get(worker.__module__).__file__ + '.' + worker.__name__, 

52 state=gws.JobState.open, 

53 payload=payload, 

54 created=gws.u.stime(), 

55 updated=gws.u.stime(), 

56 ), 

57 ) 

58 

59 return self._get_job_or_fail(job_uid) 

60 

61 def get_job(self, job_uid: str, user=None, state=None): 

62 job, msg = self._get_job(job_uid, user, state) 

63 if not job: 

64 gws.log.error(msg) 

65 return job 

66 

67 def _get_job_or_fail(self, job_uid: str, user=None, state=None): 

68 job, msg = self._get_job(job_uid, user, state) 

69 if not job: 

70 raise gws.Error(msg) 

71 return job 

72 

73 def _get_job(self, job_uid: str, user=None, state=None): 

74 rs = self._db().select(f'SELECT * FROM {self.TABLE} WHERE uid=:uid', uid=job_uid) 

75 if not rs: 

76 return None, f'JOB {job_uid}: not found' 

77 

78 rec = rs[0] 

79 

80 job_user = None 

81 us = rec.get('userStr') 

82 if us: 

83 job_user = self.root.app.authMgr.unserialize_user(us) 

84 if not job_user: 

85 job_user = self.root.app.authMgr.guestUser 

86 

87 if user and job_user.uid != user.uid: 

88 return None, f'JOB {job_uid}: wrong user {job_user.uid=} {user.uid=}' 

89 

90 if state and rec.get('state') != state: 

91 return None, f'JOB {job_uid}: wrong state {rec.get("state")=} {state=}' 

92 

93 job = gws.Job( 

94 uid=rec['uid'], 

95 user=job_user, 

96 worker=rec['worker'], 

97 state=rec['state'], 

98 error=rec['error'], 

99 numSteps=rec['numSteps'] or 0, 

100 step=rec['step'] or 0, 

101 stepName=rec['stepName'] or '', 

102 payload=gws.lib.jsonx.from_string(rec['payload'] or '{}'), 

103 timeCreated=dtx.from_timestamp(rec['created'] or 0), 

104 timeUpdated=dtx.from_timestamp(rec['updated'] or 0), 

105 ) 

106 return job, '' 

107 

108 def update_job(self, job, **kwargs): 

109 job = self.get_job(job.uid) 

110 if not job: 

111 return 

112 self._write(job.uid, kwargs) 

113 return self._get_job_or_fail(job.uid) 

114 

115 def _write(self, job_uid, rec): 

116 rec['updated'] = gws.u.stime() 

117 p = rec.get('payload') 

118 if p is not None and not isinstance(p, str): 

119 rec['payload'] = gws.lib.jsonx.to_string(p) 

120 gws.log.debug(f'JOB {job_uid}: save {rec=}') 

121 self._db().update(self.TABLE, rec, job_uid) 

122 

123 def schedule_job(self, job: gws.Job): 

124 job = self._get_job_or_fail(job.uid, state=gws.JobState.open) 

125 if gws.server.spool.is_active(): 

126 gws.server.spool.add(job) 

127 return job 

128 return self.run_job(job) 

129 

130 def run_job(self, job: gws.Job): 

131 job_uid = job.uid 

132 

133 # atomically mark an 'open' job as 'running' 

134 

135 tmp = gws.u.random_string(64) 

136 self._db().execute( 

137 f'UPDATE {self.TABLE} SET state=:tmp WHERE uid=:uid AND state=:state', 

138 uid=job_uid, 

139 tmp=tmp, 

140 state=gws.JobState.open, 

141 ) 

142 job = self._get_job_or_fail(job_uid, state=tmp) 

143 self._db().execute( 

144 f'UPDATE {self.TABLE} SET state=:s WHERE uid=:uid', 

145 uid=job.uid, 

146 s=gws.JobState.running, 

147 ) 

148 job = self._get_job_or_fail(job_uid, state=gws.JobState.running) 

149 

150 # now it's ours, let's run it 

151 

152 try: 

153 mod_path, _, fn_name = job.worker.rpartition('.') 

154 mod = gws.lib.importer.import_from_path(mod_path) 

155 worker_cls = getattr(mod, fn_name) 

156 worker_cls.run(self.root, job) 

157 except gws.JobTerminated as exc: 

158 gws.log.error(f'JOB {job_uid}: JobTerminated: {exc.args[0]!r}') 

159 self.update_job(job, state=gws.JobState.error) 

160 except Exception as exc: 

161 gws.log.error(f'JOB {job_uid}: FAILED {exc=}') 

162 gws.log.exception() 

163 self.update_job(job, state=gws.JobState.error, error=repr(exc)) 

164 

165 return self.get_job(job_uid) 

166 

167 def cancel_job(self, job: gws.Job) -> Optional[gws.Job]: 

168 return self.update_job(job, state=gws.JobState.cancel) 

169 

170 def remove_job(self, job: gws.Job): 

171 self._db().delete(self.TABLE, job.uid) 

172 

173 def require_job(self, req, p): 

174 job = self.root.app.jobMgr.get_job(p.jobUid, req.user) 

175 if not job: 

176 raise gws.NotFoundError(f'JOB {p.jobUid}: not found') 

177 return job 

178 

179 def handle_status_request(self, req, p): 

180 job = self.require_job(req, p) 

181 return self.job_status_response(job) 

182 

183 def handle_cancel_request(self, req, p): 

184 job = self.require_job(req, p) 

185 job = self.cancel_job(job) 

186 if not job: 

187 raise gws.NotFoundError(f'JOB {p.jobUid}: not found') 

188 return self.job_status_response(job) 

189 

190 def job_status_response(self, job, **kwargs): 

191 d = dict( 

192 jobUid=job.uid, 

193 state=job.state, 

194 stepName=job.stepName or '', 

195 progress=self._get_progress(job), 

196 output={}, 

197 ) 

198 d.update(kwargs) 

199 return gws.JobStatusResponse(d) 

200 

201 def _get_progress(self, job): 

202 if job.state == gws.JobState.complete: 

203 return 100 

204 if job.state != gws.JobState.running: 

205 return 0 

206 if not job.numSteps: 

207 return 0 

208 return int(min(100.0, job.step * 100.0 / job.numSteps)) 

209 

210 ## 

211 

212 _sqlitex: gws.lib.sqlitex.Object 

213 

214 def _db(self): 

215 if getattr(self, '_sqlitex', None) is None: 

216 self._sqlitex = gws.lib.sqlitex.Object(self.dbPath, self.DDL) 

217 return self._sqlitex