Coverage for gws-app / gws / base / job / manager.py: 26%
122 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1"""Job manager."""
3from typing import Optional
4import sys
6import gws
7import gws.lib.importer
8import gws.lib.jsonx
9import gws.lib.sqlitex
10import gws.lib.datetimex as dtx
11import gws.server.spool
14class Object(gws.JobManager):
15 TABLE = 'jobs'
16 DDL = f"""
17 CREATE TABLE IF NOT EXISTS {TABLE} (
18 uid TEXT NOT NULL PRIMARY KEY,
19 userUid TEXT DEFAULT '',
20 userStr TEXT DEFAULT '',
21 worker TEXT DEFAULT '',
22 state TEXT DEFAULT '',
23 error TEXT DEFAULT '',
24 numSteps INTEGER DEFAULT 0,
25 step INTEGER DEFAULT 0,
26 stepName TEXT DEFAULT '',
27 payload TEXT DEFAULT '',
28 created INTEGER DEFAULT 0,
29 updated INTEGER DEFAULT 0
30 )
31 """
33 dbPath: str
35 def configure(self):
36 ver = self.root.specs.version.rpartition('.')[0]
37 self.dbPath = self.cfg('path', default=f'{gws.c.MISC_DIR}/jobs.{ver}.sqlite')
39 def create_job(self, worker, user, payload=None):
40 job_uid = gws.u.random_string(64)
41 gws.log.debug(f'JOB {job_uid}: creating: {worker=} {user.uid=}')
43 self._db().insert(self.TABLE, dict(uid=job_uid))
45 self._write(
46 job_uid,
47 dict(
48 uid=job_uid,
49 userUid=user.uid,
50 userStr=self.root.app.authMgr.serialize_user(user),
51 worker=sys.modules.get(worker.__module__).__file__ + '.' + worker.__name__,
52 state=gws.JobState.open,
53 payload=payload,
54 created=gws.u.stime(),
55 updated=gws.u.stime(),
56 ),
57 )
59 return self._get_job_or_fail(job_uid)
61 def get_job(self, job_uid: str, user=None, state=None):
62 job, msg = self._get_job(job_uid, user, state)
63 if not job:
64 gws.log.error(msg)
65 return job
67 def _get_job_or_fail(self, job_uid: str, user=None, state=None):
68 job, msg = self._get_job(job_uid, user, state)
69 if not job:
70 raise gws.Error(msg)
71 return job
73 def _get_job(self, job_uid: str, user=None, state=None):
74 rs = self._db().select(f'SELECT * FROM {self.TABLE} WHERE uid=:uid', uid=job_uid)
75 if not rs:
76 return None, f'JOB {job_uid}: not found'
78 rec = rs[0]
80 job_user = None
81 us = rec.get('userStr')
82 if us:
83 job_user = self.root.app.authMgr.unserialize_user(us)
84 if not job_user:
85 job_user = self.root.app.authMgr.guestUser
87 if user and job_user.uid != user.uid:
88 return None, f'JOB {job_uid}: wrong user {job_user.uid=} {user.uid=}'
90 if state and rec.get('state') != state:
91 return None, f'JOB {job_uid}: wrong state {rec.get("state")=} {state=}'
93 job = gws.Job(
94 uid=rec['uid'],
95 user=job_user,
96 worker=rec['worker'],
97 state=rec['state'],
98 error=rec['error'],
99 numSteps=rec['numSteps'] or 0,
100 step=rec['step'] or 0,
101 stepName=rec['stepName'] or '',
102 payload=gws.lib.jsonx.from_string(rec['payload'] or '{}'),
103 timeCreated=dtx.from_timestamp(rec['created'] or 0),
104 timeUpdated=dtx.from_timestamp(rec['updated'] or 0),
105 )
106 return job, ''
108 def update_job(self, job, **kwargs):
109 job = self.get_job(job.uid)
110 if not job:
111 return
112 self._write(job.uid, kwargs)
113 return self._get_job_or_fail(job.uid)
115 def _write(self, job_uid, rec):
116 rec['updated'] = gws.u.stime()
117 p = rec.get('payload')
118 if p is not None and not isinstance(p, str):
119 rec['payload'] = gws.lib.jsonx.to_string(p)
120 gws.log.debug(f'JOB {job_uid}: save {rec=}')
121 self._db().update(self.TABLE, rec, job_uid)
123 def schedule_job(self, job: gws.Job):
124 job = self._get_job_or_fail(job.uid, state=gws.JobState.open)
125 if gws.server.spool.is_active():
126 gws.server.spool.add(job)
127 return job
128 return self.run_job(job)
130 def run_job(self, job: gws.Job):
131 job_uid = job.uid
133 # atomically mark an 'open' job as 'running'
135 tmp = gws.u.random_string(64)
136 self._db().execute(
137 f'UPDATE {self.TABLE} SET state=:tmp WHERE uid=:uid AND state=:state',
138 uid=job_uid,
139 tmp=tmp,
140 state=gws.JobState.open,
141 )
142 job = self._get_job_or_fail(job_uid, state=tmp)
143 self._db().execute(
144 f'UPDATE {self.TABLE} SET state=:s WHERE uid=:uid',
145 uid=job.uid,
146 s=gws.JobState.running,
147 )
148 job = self._get_job_or_fail(job_uid, state=gws.JobState.running)
150 # now it's ours, let's run it
152 try:
153 mod_path, _, fn_name = job.worker.rpartition('.')
154 mod = gws.lib.importer.import_from_path(mod_path)
155 worker_cls = getattr(mod, fn_name)
156 worker_cls.run(self.root, job)
157 except gws.JobTerminated as exc:
158 gws.log.error(f'JOB {job_uid}: JobTerminated: {exc.args[0]!r}')
159 self.update_job(job, state=gws.JobState.error)
160 except Exception as exc:
161 gws.log.error(f'JOB {job_uid}: FAILED {exc=}')
162 gws.log.exception()
163 self.update_job(job, state=gws.JobState.error, error=repr(exc))
165 return self.get_job(job_uid)
167 def cancel_job(self, job: gws.Job) -> Optional[gws.Job]:
168 return self.update_job(job, state=gws.JobState.cancel)
170 def remove_job(self, job: gws.Job):
171 self._db().delete(self.TABLE, job.uid)
173 def require_job(self, req, p):
174 job = self.root.app.jobMgr.get_job(p.jobUid, req.user)
175 if not job:
176 raise gws.NotFoundError(f'JOB {p.jobUid}: not found')
177 return job
179 def handle_status_request(self, req, p):
180 job = self.require_job(req, p)
181 return self.job_status_response(job)
183 def handle_cancel_request(self, req, p):
184 job = self.require_job(req, p)
185 job = self.cancel_job(job)
186 if not job:
187 raise gws.NotFoundError(f'JOB {p.jobUid}: not found')
188 return self.job_status_response(job)
190 def job_status_response(self, job, **kwargs):
191 d = dict(
192 jobUid=job.uid,
193 state=job.state,
194 stepName=job.stepName or '',
195 progress=self._get_progress(job),
196 output={},
197 )
198 d.update(kwargs)
199 return gws.JobStatusResponse(d)
201 def _get_progress(self, job):
202 if job.state == gws.JobState.complete:
203 return 100
204 if job.state != gws.JobState.running:
205 return 0
206 if not job.numSteps:
207 return 0
208 return int(min(100.0, job.step * 100.0 / job.numSteps))
210 ##
212 _sqlitex: gws.lib.sqlitex.Object
214 def _db(self):
215 if getattr(self, '_sqlitex', None) is None:
216 self._sqlitex = gws.lib.sqlitex.Object(self.dbPath, self.DDL)
217 return self._sqlitex