Coverage for gws-app/gws/base/job/worker.py: 32%
19 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
1"""Base job worker"""
3from typing import Optional
4import gws
7class Object:
8 def __init__(self, root: gws.Root, user: gws.User, job: Optional[gws.Job] = None):
9 self.jobUid = job.uid if job else None
10 self.root = root
11 self.user = user
13 def get_job(self) -> Optional[gws.Job]:
14 if not self.jobUid:
15 return
17 job = self.root.app.jobMgr.get_job(
18 self.jobUid,
19 user=self.user,
20 state=gws.JobState.running
21 )
22 if not job:
23 self.jobUid = 0
24 raise gws.JobTerminated(f'JOB {self.jobUid!r} TERMINATED')
25 return job
27 def update_job(self, **kwargs):
28 job = self.get_job()
29 if job:
30 self.root.app.jobMgr.update_job(job, **kwargs)