Coverage for gws-app / gws / base / action / manager.py: 45%

87 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1from typing import Optional 

2import gws 

3import gws.spec.runtime 

4 

5 

6def get_action_for_cli(root: gws.Root, action_name: str, project_uid: Optional[str] = None) -> Optional[gws.Action]: 

7 """Get an action object by its name and optional project UID. 

8 

9 If no project UID is provided, it searches for the action in the global scope and then in all projects. 

10 """ 

11 

12 if project_uid: 

13 project = root.app.project(project_uid) 

14 if not project: 

15 gws.log.error(f'project {project_uid!r} not found') 

16 return None 

17 act = root.app.actionMgr.find_action(project, action_name, root.app.authMgr.systemUser) 

18 if act: 

19 return act 

20 gws.log.error(f'action {action_name!r} not found in {project_uid!r}') 

21 

22 act = root.app.actionMgr.find_action(None, action_name, root.app.authMgr.systemUser) 

23 if act: 

24 return act 

25 

26 for project in root.app.projects: 

27 act = root.app.actionMgr.find_action(project, action_name, root.app.authMgr.systemUser) 

28 if act: 

29 gws.log.info(f'using action {action_name!r} from {project.uid!r}') 

30 return act 

31 

32 gws.log.error(f'action {action_name!r} not found') 

33 

34 

35def parse_cli_request( 

36 root: gws.Root, 

37 command_category: gws.CommandCategory, 

38 command_name: str, 

39 params: dict, 

40 read_options=None, 

41): 

42 """Parse a CLI request and return the action handler and request object.""" 

43 

44 desc = root.specs.command_descriptor(command_category, command_name) 

45 if not desc: 

46 raise gws.NotFoundError(f'{command_category}.{command_name} not found') 

47 

48 try: 

49 request = root.specs.read(params, desc.tArg, options=read_options) 

50 except gws.spec.runtime.ReadError as exc: 

51 raise gws.BadRequestError(f'read error: {exc!r}') from exc 

52 

53 cls = gws.u.require(root.specs.get_class(desc.tOwner)) 

54 action = cls() 

55 

56 fn = getattr(action, desc.methodName) 

57 return fn, request 

58 

59 

60class Object(gws.ActionManager): 

61 def actions_for_project(self, project, user): 

62 d = {} 

63 

64 for a in project.actions: 

65 if user.can_use(a): 

66 d[a.extType] = a 

67 

68 for a in self.root.app.actions: 

69 if a.extType not in d and user.can_use(a): 

70 d[a.extType] = a 

71 

72 return list(d.values()) 

73 

74 def prepare_action(self, command_category, command_name, params, user, read_options=None): 

75 desc = self.root.specs.command_descriptor(command_category, command_name) 

76 if not desc: 

77 raise gws.NotFoundError(f'{command_category}.{command_name} not found') 

78 

79 if desc.extCommandCategory == gws.CommandCategory.raw: 

80 request = gws.Request( 

81 projectUid=params.get('projectUid'), 

82 localeUid=params.get('localeUid'), 

83 ) 

84 else: 

85 try: 

86 request = gws.u.require(self.root.specs.read(params, desc.tArg, options=read_options)) 

87 except gws.spec.runtime.ReadError as exc: 

88 raise gws.BadRequestError(f'read error: {exc!r}') from exc 

89 

90 action = None 

91 

92 project_uid = request.get('projectUid') 

93 if project_uid: 

94 project = self.root.app.project(project_uid) 

95 if not project: 

96 raise gws.NotFoundError(f'project not found: {project_uid!r}') 

97 if not user.can_use(project): 

98 raise gws.ForbiddenError(f'project forbidden: {project_uid!r}') 

99 action = self._find_by_ext_name(project, desc.owner.extName, user) 

100 

101 if not action: 

102 action = self._find_by_ext_name(self.root.app, desc.owner.extName, user) 

103 

104 if not action: 

105 raise gws.NotFoundError(f'action {desc.owner.extName!r}: not found') 

106 

107 fn = getattr(action, desc.methodName) 

108 return fn, request 

109 

110 def find_action(self, project, ext_type, user): 

111 if project: 

112 a = self._find_by_ext_type(project, ext_type, user) 

113 if a: 

114 return a 

115 return self._find_by_ext_type(self.root.app, ext_type, user) 

116 

117 # @TODO build indexes for this 

118 

119 def _find_by_ext_name(self, obj, ext_name: str, user: gws.User): 

120 for a in obj.actions: 

121 if a.extName == ext_name: 

122 if not user.can_use(a): 

123 raise gws.ForbiddenError(f'action {ext_name!r}: forbidden in {obj!r}') 

124 return a 

125 

126 def _find_by_ext_type(self, obj, ext_type: str, user: gws.User): 

127 for a in obj.actions: 

128 if a.extType == ext_type: 

129 if not user.can_use(a): 

130 raise gws.ForbiddenError(f'action {ext_type!r}: forbidden in {obj!r}') 

131 return a