Coverage for gws-app / gws / base / action / manager.py: 45%
87 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1from typing import Optional
2import gws
3import gws.spec.runtime
6def get_action_for_cli(root: gws.Root, action_name: str, project_uid: Optional[str] = None) -> Optional[gws.Action]:
7 """Get an action object by its name and optional project UID.
9 If no project UID is provided, it searches for the action in the global scope and then in all projects.
10 """
12 if project_uid:
13 project = root.app.project(project_uid)
14 if not project:
15 gws.log.error(f'project {project_uid!r} not found')
16 return None
17 act = root.app.actionMgr.find_action(project, action_name, root.app.authMgr.systemUser)
18 if act:
19 return act
20 gws.log.error(f'action {action_name!r} not found in {project_uid!r}')
22 act = root.app.actionMgr.find_action(None, action_name, root.app.authMgr.systemUser)
23 if act:
24 return act
26 for project in root.app.projects:
27 act = root.app.actionMgr.find_action(project, action_name, root.app.authMgr.systemUser)
28 if act:
29 gws.log.info(f'using action {action_name!r} from {project.uid!r}')
30 return act
32 gws.log.error(f'action {action_name!r} not found')
35def parse_cli_request(
36 root: gws.Root,
37 command_category: gws.CommandCategory,
38 command_name: str,
39 params: dict,
40 read_options=None,
41):
42 """Parse a CLI request and return the action handler and request object."""
44 desc = root.specs.command_descriptor(command_category, command_name)
45 if not desc:
46 raise gws.NotFoundError(f'{command_category}.{command_name} not found')
48 try:
49 request = root.specs.read(params, desc.tArg, options=read_options)
50 except gws.spec.runtime.ReadError as exc:
51 raise gws.BadRequestError(f'read error: {exc!r}') from exc
53 cls = gws.u.require(root.specs.get_class(desc.tOwner))
54 action = cls()
56 fn = getattr(action, desc.methodName)
57 return fn, request
60class Object(gws.ActionManager):
61 def actions_for_project(self, project, user):
62 d = {}
64 for a in project.actions:
65 if user.can_use(a):
66 d[a.extType] = a
68 for a in self.root.app.actions:
69 if a.extType not in d and user.can_use(a):
70 d[a.extType] = a
72 return list(d.values())
74 def prepare_action(self, command_category, command_name, params, user, read_options=None):
75 desc = self.root.specs.command_descriptor(command_category, command_name)
76 if not desc:
77 raise gws.NotFoundError(f'{command_category}.{command_name} not found')
79 if desc.extCommandCategory == gws.CommandCategory.raw:
80 request = gws.Request(
81 projectUid=params.get('projectUid'),
82 localeUid=params.get('localeUid'),
83 )
84 else:
85 try:
86 request = gws.u.require(self.root.specs.read(params, desc.tArg, options=read_options))
87 except gws.spec.runtime.ReadError as exc:
88 raise gws.BadRequestError(f'read error: {exc!r}') from exc
90 action = None
92 project_uid = request.get('projectUid')
93 if project_uid:
94 project = self.root.app.project(project_uid)
95 if not project:
96 raise gws.NotFoundError(f'project not found: {project_uid!r}')
97 if not user.can_use(project):
98 raise gws.ForbiddenError(f'project forbidden: {project_uid!r}')
99 action = self._find_by_ext_name(project, desc.owner.extName, user)
101 if not action:
102 action = self._find_by_ext_name(self.root.app, desc.owner.extName, user)
104 if not action:
105 raise gws.NotFoundError(f'action {desc.owner.extName!r}: not found')
107 fn = getattr(action, desc.methodName)
108 return fn, request
110 def find_action(self, project, ext_type, user):
111 if project:
112 a = self._find_by_ext_type(project, ext_type, user)
113 if a:
114 return a
115 return self._find_by_ext_type(self.root.app, ext_type, user)
117 # @TODO build indexes for this
119 def _find_by_ext_name(self, obj, ext_name: str, user: gws.User):
120 for a in obj.actions:
121 if a.extName == ext_name:
122 if not user.can_use(a):
123 raise gws.ForbiddenError(f'action {ext_name!r}: forbidden in {obj!r}')
124 return a
126 def _find_by_ext_type(self, obj, ext_type: str, user: gws.User):
127 for a in obj.actions:
128 if a.extType == ext_type:
129 if not user.can_use(a):
130 raise gws.ForbiddenError(f'action {ext_type!r}: forbidden in {obj!r}')
131 return a