Coverage for gws-app / gws / base / job / worker.py: 38%

21 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1"""Base job worker""" 

2 

3from typing import Optional 

4import gws 

5 

6 

7class Object: 

8 jobUid: str 

9 user: gws.User 

10 

11 def __init__(self, root: gws.Root, user: gws.User, job: Optional[gws.Job] = None): 

12 self.jobUid = job.uid if job else '' 

13 self.root = root 

14 self.user = user 

15 

16 def get_job(self) -> Optional[gws.Job]: 

17 if not self.jobUid: 

18 return 

19 

20 job = self.root.app.jobMgr.get_job( 

21 self.jobUid, 

22 user=self.user, 

23 state=gws.JobState.running 

24 ) 

25 if not job: 

26 self.jobUid = '' 

27 raise gws.JobTerminated(f'JOB {self.jobUid!r} TERMINATED') 

28 return job 

29 

30 def update_job(self, **kwargs): 

31 job = self.get_job() 

32 if job: 

33 self.root.app.jobMgr.update_job(job, **kwargs)