Coverage for gws-app/gws/base/job/worker.py: 32%

19 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1"""Base job worker""" 

2 

3from typing import Optional 

4import gws 

5 

6 

7class Object: 

8 def __init__(self, root: gws.Root, user: gws.User, job: Optional[gws.Job] = None): 

9 self.jobUid = job.uid if job else None 

10 self.root = root 

11 self.user = user 

12 

13 def get_job(self) -> Optional[gws.Job]: 

14 if not self.jobUid: 

15 return 

16 

17 job = self.root.app.jobMgr.get_job( 

18 self.jobUid, 

19 user=self.user, 

20 state=gws.JobState.running 

21 ) 

22 if not job: 

23 self.jobUid = 0 

24 raise gws.JobTerminated(f'JOB {self.jobUid!r} TERMINATED') 

25 return job 

26 

27 def update_job(self, **kwargs): 

28 job = self.get_job() 

29 if job: 

30 self.root.app.jobMgr.update_job(job, **kwargs)