Coverage for gws-app / gws / base / job / worker.py: 38%
21 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1"""Base job worker"""
3from typing import Optional
4import gws
7class Object:
8 jobUid: str
9 user: gws.User
11 def __init__(self, root: gws.Root, user: gws.User, job: Optional[gws.Job] = None):
12 self.jobUid = job.uid if job else ''
13 self.root = root
14 self.user = user
16 def get_job(self) -> Optional[gws.Job]:
17 if not self.jobUid:
18 return
20 job = self.root.app.jobMgr.get_job(
21 self.jobUid,
22 user=self.user,
23 state=gws.JobState.running
24 )
25 if not job:
26 self.jobUid = ''
27 raise gws.JobTerminated(f'JOB {self.jobUid!r} TERMINATED')
28 return job
30 def update_job(self, **kwargs):
31 job = self.get_job()
32 if job:
33 self.root.app.jobMgr.update_job(job, **kwargs)