Coverage for gws-app/gws/base/action/manager.py: 45%

85 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1from typing import Optional 

2import gws 

3import gws.spec.runtime 

4 

5 

6def get_action_for_cli(root: gws.Root, action_name: str, project_uid: Optional[str] = None) -> Optional[gws.Action]: 

7 """Get an action object by its name and optional project UID. 

8 

9 If no project UID is provided, it searches for the action in the global scope and then in all projects. 

10 """ 

11 

12 if project_uid: 

13 project = root.app.project(project_uid) 

14 if not project: 

15 gws.log.error(f'project {project_uid!r} not found') 

16 return None 

17 act = root.app.actionMgr.find_action(project, action_name, root.app.authMgr.systemUser) 

18 if act: 

19 return act 

20 gws.log.error(f'action {action_name!r} not found in {project_uid!r}') 

21 

22 act = root.app.actionMgr.find_action(None, action_name, root.app.authMgr.systemUser) 

23 if act: 

24 return act 

25 

26 for project in root.app.projects: 

27 act = root.app.actionMgr.find_action(project, action_name, root.app.authMgr.systemUser) 

28 if act: 

29 gws.log.info(f'using action {action_name!r} from {project.uid!r}') 

30 return act 

31 

32 gws.log.error(f'action {action_name!r} not found') 

33 

34 

35def parse_cli_request( 

36 root: gws.Root, 

37 command_category: gws.CommandCategory, 

38 command_name: str, 

39 params: dict, 

40 read_options=None, 

41): 

42 """Parse a CLI request and return the action handler and request object.""" 

43 

44 desc = root.specs.command_descriptor(command_category, command_name) 

45 if not desc: 

46 raise gws.NotFoundError(f'{command_category}.{command_name} not found') 

47 

48 try: 

49 request = root.specs.read(params, desc.tArg, options=read_options) 

50 except gws.spec.runtime.ReadError as exc: 

51 raise gws.BadRequestError(f'read error: {exc!r}') from exc 

52 

53 cls = gws.u.require(root.specs.get_class(desc.tOwner)) 

54 action = cls() 

55 

56 fn = getattr(action, desc.methodName) 

57 return fn, request 

58 

59 

60class Object(gws.ActionManager): 

61 def actions_for_project(self, project, user): 

62 d = {} 

63 

64 for a in project.actions: 

65 if user.can_use(a): 

66 d[a.extType] = a 

67 

68 for a in self.root.app.actions: 

69 if a.extType not in d and user.can_use(a): 

70 d[a.extType] = a 

71 

72 return list(d.values()) 

73 

74 def prepare_action(self, command_category, command_name, params, user, read_options=None): 

75 desc = self.root.specs.command_descriptor(command_category, command_name) 

76 if not desc: 

77 raise gws.NotFoundError(f'{command_category}.{command_name} not found') 

78 

79 try: 

80 request = gws.u.require(self.root.specs.read(params, desc.tArg, options=read_options)) 

81 except gws.spec.runtime.ReadError as exc: 

82 raise gws.BadRequestError(f'read error: {exc!r}') from exc 

83 

84 action = None 

85 

86 project_uid = request.get('projectUid') 

87 if project_uid: 

88 project = self.root.app.project(project_uid) 

89 if not project: 

90 raise gws.NotFoundError(f'project not found: {project_uid!r}') 

91 if not user.can_use(project): 

92 raise gws.ForbiddenError(f'project forbidden: {project_uid!r}') 

93 action = self._find_by_ext_name(project, desc.owner.extName, user) 

94 

95 if not action: 

96 action = self._find_by_ext_name(self.root.app, desc.owner.extName, user) 

97 

98 if not action: 

99 raise gws.NotFoundError(f'action {desc.owner.extName!r}: not found') 

100 

101 fn = getattr(action, desc.methodName) 

102 return fn, request 

103 

104 def find_action(self, project, ext_type, user): 

105 if project: 

106 a = self._find_by_ext_type(project, ext_type, user) 

107 if a: 

108 return a 

109 return self._find_by_ext_type(self.root.app, ext_type, user) 

110 

111 # @TODO build indexes for this 

112 

113 def _find_by_ext_name(self, obj, ext_name: str, user: gws.User): 

114 for a in obj.actions: 

115 if a.extName == ext_name: 

116 if not user.can_use(a): 

117 raise gws.ForbiddenError(f'action {ext_name!r}: forbidden in {obj!r}') 

118 return a 

119 

120 def _find_by_ext_type(self, obj, ext_type: str, user: gws.User): 

121 for a in obj.actions: 

122 if a.extType == ext_type: 

123 if not user.can_use(a): 

124 raise gws.ForbiddenError(f'action {ext_type!r}: forbidden in {obj!r}') 

125 return a