Coverage for gws-app/gws/base/job/manager.py: 27%

118 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1"""Job manager.""" 

2 

3from typing import Optional 

4import sys 

5 

6import gws 

7import gws.lib.importer 

8import gws.lib.jsonx 

9import gws.lib.sqlitex 

10import gws.lib.mime 

11import gws.server.spool 

12 

13 

14class Object(gws.JobManager): 

15 TABLE = 'jobs' 

16 DDL = f''' 

17 CREATE TABLE IF NOT EXISTS {TABLE} ( 

18 uid TEXT NOT NULL PRIMARY KEY, 

19 userUid TEXT DEFAULT '', 

20 userStr TEXT DEFAULT '', 

21 worker TEXT DEFAULT '', 

22 state TEXT DEFAULT '', 

23 error TEXT DEFAULT '', 

24 numSteps INTEGER DEFAULT 0, 

25 step INTEGER DEFAULT 0, 

26 stepName TEXT DEFAULT '', 

27 payload TEXT DEFAULT '', 

28 created INTEGER DEFAULT 0, 

29 updated INTEGER DEFAULT 0 

30 ) 

31 ''' 

32 

33 dbPath: str 

34 

35 def configure(self): 

36 self.dbPath = self.cfg('path', default=f'{gws.c.MISC_DIR}/jobs82.sqlite') 

37 # self.root.app.monitor.register_periodic_task(self) 

38 

39 def periodic_task(self): 

40 pass 

41 

42 def create_job(self, user, worker, payload=None): 

43 job_uid = gws.u.random_string(64) 

44 gws.log.debug(f'JOB {job_uid}: creating: {worker=} {user.uid=}') 

45 

46 self._db().insert(self.TABLE, dict(uid=job_uid)) 

47 

48 self._write(job_uid, dict( 

49 uid=job_uid, 

50 userUid=user.uid, 

51 userStr=self.root.app.authMgr.serialize_user(user), 

52 worker=sys.modules.get(worker.__module__).__file__ + '.' + worker.__name__, 

53 state=gws.JobState.open, 

54 payload=payload, 

55 created=gws.u.stime(), 

56 )) 

57 

58 return self.get_job_or_fail(job_uid) 

59 

60 def get_job(self, job_uid: str, user=None, state=None): 

61 job, msg = self._get_job(job_uid, user, state) 

62 if not job: 

63 gws.log.error(msg) 

64 return job 

65 

66 def get_job_or_fail(self, job_uid: str, user=None, state=None): 

67 job, msg = self._get_job(job_uid, user, state) 

68 if not job: 

69 raise gws.Error(msg) 

70 return job 

71 

72 def _get_job(self, job_uid: str, user=None, state=None): 

73 rs = self._db().select(f'SELECT * FROM {self.TABLE} WHERE uid=:uid', uid=job_uid) 

74 if not rs: 

75 return None, f'JOB {job_uid}: not found' 

76 

77 rec = rs[0] 

78 

79 job_user = None 

80 if rec.get('userStr'): 

81 job_user = self.root.app.authMgr.unserialize_user(rec.get('userStr')) 

82 if not job_user: 

83 job_user = self.root.app.authMgr.guestUser 

84 

85 if user and job_user.uid != user.uid: 

86 return None, f'JOB {job_uid}: wrong user {job_user.uid=} {user.uid=}' 

87 

88 if state and rec.get('state') != state: 

89 return None, f'JOB {job_uid}: wrong state {rec.get("state")=} {state=}' 

90 

91 job = gws.Job( 

92 uid=rec['uid'], 

93 user=job_user, 

94 worker=rec['worker'], 

95 state=rec['state'], 

96 error=rec['error'], 

97 numSteps=rec['numSteps'] or 0, 

98 step=rec['step'] or 0, 

99 stepName=rec['stepName'] or '', 

100 payload=gws.lib.jsonx.from_string(rec['payload'] or '{}'), 

101 ) 

102 return job, '' 

103 

104 def update_job(self, job, **kwargs): 

105 job = self.get_job(job.uid) 

106 if not job: 

107 return 

108 self._write(job.uid, kwargs) 

109 return self.get_job_or_fail(job.uid) 

110 

111 def _write(self, job_uid, rec): 

112 rec['updated'] = gws.u.stime() 

113 rec['payload'] = gws.lib.jsonx.to_string(rec.get('payload') or {}) 

114 gws.log.debug(f'JOB {job_uid}: save {rec=}') 

115 self._db().update(self.TABLE, rec, job_uid) 

116 

117 def schedule_job(self, job: gws.Job): 

118 job = self.get_job_or_fail(job.uid, state=gws.JobState.open) 

119 if gws.server.spool.is_active(): 

120 gws.server.spool.add(job) 

121 return job 

122 return self.run_job(job) 

123 

124 def run_job(self, job: gws.Job): 

125 job_uid = job.uid 

126 

127 # atomically mark an 'open' job as 'running' 

128 

129 tmp = gws.u.random_string(64) 

130 self._db().execute( 

131 f'UPDATE {self.TABLE} SET state=:tmp WHERE uid=:uid AND state=:state', 

132 uid=job_uid, tmp=tmp, state=gws.JobState.open 

133 ) 

134 job = self.get_job_or_fail(job_uid, state=tmp) 

135 self._db().execute( 

136 f'UPDATE {self.TABLE} SET state=:s WHERE uid=:uid', 

137 uid=job.uid, s=gws.JobState.running 

138 ) 

139 job = self.get_job_or_fail(job_uid, state=gws.JobState.running) 

140 

141 # now it's ours, let's run it 

142 

143 try: 

144 mod_path, _, fn_name = job.worker.rpartition('.') 

145 mod = gws.lib.importer.import_from_path(mod_path) 

146 worker_cls = getattr(mod, fn_name) 

147 worker_cls.run(self.root, job) 

148 except gws.JobTerminated as exc: 

149 gws.log.error(f'JOB {job_uid}: JobTerminated: {exc.args[0]!r}') 

150 self.update_job(job, state=gws.JobState.error) 

151 except Exception as exc: 

152 gws.log.error(f'JOB {job_uid}: FAILED {exc=}') 

153 gws.log.exception() 

154 self.update_job(job, state=gws.JobState.error, error=repr(exc)) 

155 

156 return self.get_job(job_uid) 

157 

158 def cancel_job(self, job: gws.Job) -> Optional[gws.Job]: 

159 return self.update_job(job, state=gws.JobState.cancel) 

160 

161 def remove_job(self, job: gws.Job): 

162 self._db().delete(self.TABLE, job.uid) 

163 

164 def require_job(self, req, p): 

165 job = self.root.app.jobMgr.get_job(p.jobUid, req.user) 

166 if not job: 

167 raise gws.NotFoundError(f'JOB {p.jobUid}: not found') 

168 return job 

169 

170 def handle_status_request(self, req, p): 

171 job = self.require_job(req, p) 

172 return self.job_status_response(job) 

173 

174 def handle_cancel_request(self, req, p): 

175 job = self.require_job(req, p) 

176 job = self.cancel_job(job) 

177 return self.job_status_response(job) 

178 

179 def job_status_response(self, job, **kwargs): 

180 d = dict( 

181 jobUid=job.uid, 

182 state=job.state, 

183 stepName=job.stepName or '', 

184 progress=self._get_progress(job), 

185 output={} 

186 ) 

187 d.update(kwargs) 

188 return gws.JobStatusResponse(d) 

189 

190 def _get_progress(self, job): 

191 if job.state == gws.JobState.complete: 

192 return 100 

193 if job.state != gws.JobState.running: 

194 return 0 

195 if not job.numSteps: 

196 return 0 

197 return int(min(100.0, job.step * 100.0 / job.numSteps)) 

198 

199 ## 

200 

201 _sqlitex: gws.lib.sqlitex.Object 

202 

203 def _db(self): 

204 if getattr(self, '_sqlitex', None) is None: 

205 self._sqlitex = gws.lib.sqlitex.Object(self.dbPath, self.DDL) 

206 return self._sqlitex