Coverage for gws-app/gws/base/job/manager.py: 27%
118 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
1"""Job manager."""
3from typing import Optional
4import sys
6import gws
7import gws.lib.importer
8import gws.lib.jsonx
9import gws.lib.sqlitex
10import gws.lib.mime
11import gws.server.spool
14class Object(gws.JobManager):
15 TABLE = 'jobs'
16 DDL = f'''
17 CREATE TABLE IF NOT EXISTS {TABLE} (
18 uid TEXT NOT NULL PRIMARY KEY,
19 userUid TEXT DEFAULT '',
20 userStr TEXT DEFAULT '',
21 worker TEXT DEFAULT '',
22 state TEXT DEFAULT '',
23 error TEXT DEFAULT '',
24 numSteps INTEGER DEFAULT 0,
25 step INTEGER DEFAULT 0,
26 stepName TEXT DEFAULT '',
27 payload TEXT DEFAULT '',
28 created INTEGER DEFAULT 0,
29 updated INTEGER DEFAULT 0
30 )
31 '''
33 dbPath: str
35 def configure(self):
36 self.dbPath = self.cfg('path', default=f'{gws.c.MISC_DIR}/jobs82.sqlite')
37 # self.root.app.monitor.register_periodic_task(self)
39 def periodic_task(self):
40 pass
42 def create_job(self, user, worker, payload=None):
43 job_uid = gws.u.random_string(64)
44 gws.log.debug(f'JOB {job_uid}: creating: {worker=} {user.uid=}')
46 self._db().insert(self.TABLE, dict(uid=job_uid))
48 self._write(job_uid, dict(
49 uid=job_uid,
50 userUid=user.uid,
51 userStr=self.root.app.authMgr.serialize_user(user),
52 worker=sys.modules.get(worker.__module__).__file__ + '.' + worker.__name__,
53 state=gws.JobState.open,
54 payload=payload,
55 created=gws.u.stime(),
56 ))
58 return self.get_job_or_fail(job_uid)
60 def get_job(self, job_uid: str, user=None, state=None):
61 job, msg = self._get_job(job_uid, user, state)
62 if not job:
63 gws.log.error(msg)
64 return job
66 def get_job_or_fail(self, job_uid: str, user=None, state=None):
67 job, msg = self._get_job(job_uid, user, state)
68 if not job:
69 raise gws.Error(msg)
70 return job
72 def _get_job(self, job_uid: str, user=None, state=None):
73 rs = self._db().select(f'SELECT * FROM {self.TABLE} WHERE uid=:uid', uid=job_uid)
74 if not rs:
75 return None, f'JOB {job_uid}: not found'
77 rec = rs[0]
79 job_user = None
80 if rec.get('userStr'):
81 job_user = self.root.app.authMgr.unserialize_user(rec.get('userStr'))
82 if not job_user:
83 job_user = self.root.app.authMgr.guestUser
85 if user and job_user.uid != user.uid:
86 return None, f'JOB {job_uid}: wrong user {job_user.uid=} {user.uid=}'
88 if state and rec.get('state') != state:
89 return None, f'JOB {job_uid}: wrong state {rec.get("state")=} {state=}'
91 job = gws.Job(
92 uid=rec['uid'],
93 user=job_user,
94 worker=rec['worker'],
95 state=rec['state'],
96 error=rec['error'],
97 numSteps=rec['numSteps'] or 0,
98 step=rec['step'] or 0,
99 stepName=rec['stepName'] or '',
100 payload=gws.lib.jsonx.from_string(rec['payload'] or '{}'),
101 )
102 return job, ''
104 def update_job(self, job, **kwargs):
105 job = self.get_job(job.uid)
106 if not job:
107 return
108 self._write(job.uid, kwargs)
109 return self.get_job_or_fail(job.uid)
111 def _write(self, job_uid, rec):
112 rec['updated'] = gws.u.stime()
113 rec['payload'] = gws.lib.jsonx.to_string(rec.get('payload') or {})
114 gws.log.debug(f'JOB {job_uid}: save {rec=}')
115 self._db().update(self.TABLE, rec, job_uid)
117 def schedule_job(self, job: gws.Job):
118 job = self.get_job_or_fail(job.uid, state=gws.JobState.open)
119 if gws.server.spool.is_active():
120 gws.server.spool.add(job)
121 return job
122 return self.run_job(job)
124 def run_job(self, job: gws.Job):
125 job_uid = job.uid
127 # atomically mark an 'open' job as 'running'
129 tmp = gws.u.random_string(64)
130 self._db().execute(
131 f'UPDATE {self.TABLE} SET state=:tmp WHERE uid=:uid AND state=:state',
132 uid=job_uid, tmp=tmp, state=gws.JobState.open
133 )
134 job = self.get_job_or_fail(job_uid, state=tmp)
135 self._db().execute(
136 f'UPDATE {self.TABLE} SET state=:s WHERE uid=:uid',
137 uid=job.uid, s=gws.JobState.running
138 )
139 job = self.get_job_or_fail(job_uid, state=gws.JobState.running)
141 # now it's ours, let's run it
143 try:
144 mod_path, _, fn_name = job.worker.rpartition('.')
145 mod = gws.lib.importer.import_from_path(mod_path)
146 worker_cls = getattr(mod, fn_name)
147 worker_cls.run(self.root, job)
148 except gws.JobTerminated as exc:
149 gws.log.error(f'JOB {job_uid}: JobTerminated: {exc.args[0]!r}')
150 self.update_job(job, state=gws.JobState.error)
151 except Exception as exc:
152 gws.log.error(f'JOB {job_uid}: FAILED {exc=}')
153 gws.log.exception()
154 self.update_job(job, state=gws.JobState.error, error=repr(exc))
156 return self.get_job(job_uid)
158 def cancel_job(self, job: gws.Job) -> Optional[gws.Job]:
159 return self.update_job(job, state=gws.JobState.cancel)
161 def remove_job(self, job: gws.Job):
162 self._db().delete(self.TABLE, job.uid)
164 def require_job(self, req, p):
165 job = self.root.app.jobMgr.get_job(p.jobUid, req.user)
166 if not job:
167 raise gws.NotFoundError(f'JOB {p.jobUid}: not found')
168 return job
170 def handle_status_request(self, req, p):
171 job = self.require_job(req, p)
172 return self.job_status_response(job)
174 def handle_cancel_request(self, req, p):
175 job = self.require_job(req, p)
176 job = self.cancel_job(job)
177 return self.job_status_response(job)
179 def job_status_response(self, job, **kwargs):
180 d = dict(
181 jobUid=job.uid,
182 state=job.state,
183 stepName=job.stepName or '',
184 progress=self._get_progress(job),
185 output={}
186 )
187 d.update(kwargs)
188 return gws.JobStatusResponse(d)
190 def _get_progress(self, job):
191 if job.state == gws.JobState.complete:
192 return 100
193 if job.state != gws.JobState.running:
194 return 0
195 if not job.numSteps:
196 return 0
197 return int(min(100.0, job.step * 100.0 / job.numSteps))
199 ##
201 _sqlitex: gws.lib.sqlitex.Object
203 def _db(self):
204 if getattr(self, '_sqlitex', None) is None:
205 self._sqlitex = gws.lib.sqlitex.Object(self.dbPath, self.DDL)
206 return self._sqlitex