Coverage for gws-app/gws/base/storage/core.py: 57%
79 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
1"""Storage object."""
3from typing import Optional
5import gws
6import gws.lib.jsonx
9class Verb(gws.Enum):
10 """Storage action verbs."""
12 read = 'read'
13 """Read an entry."""
14 write = 'write'
15 """Write an entry."""
16 list = 'list'
17 """List entries."""
18 delete = 'delete'
19 """Delete an entry."""
22class State(gws.Data):
23 """Storage state."""
25 names: list[str]
26 """List of entry names."""
27 canRead: bool
28 """User can read entries."""
29 canWrite: bool
30 """User can write entries."""
31 canCreate: bool
32 """User can create entries."""
33 canDelete: bool
34 """User can delete entries."""
37class Request(gws.Request):
38 verb: Verb
39 entryName: Optional[str]
40 entryData: Optional[dict]
43class Response(gws.Response):
44 data: Optional[dict]
45 state: State
48class Config(gws.ConfigWithAccess):
49 """Storage configuration"""
51 providerUid: Optional[str]
52 """Storage provider uid."""
53 categoryName: Optional[str]
54 """Category name."""
57class Props(gws.Props):
58 state: State
61class Object(gws.Node):
62 storageProvider: gws.StorageProvider
63 categoryName: str
65 def configure(self):
66 self.configure_provider()
67 self.categoryName = self.cfg('categoryName')
69 def configure_provider(self):
70 self.storageProvider = self.root.app.storageMgr.find_provider(self.cfg('providerUid'))
71 if not self.storageProvider:
72 raise gws.Error(f'storage provider not found')
73 return True
75 def props(self, user):
76 return gws.Props(
77 state=self.get_state_for(user),
78 )
80 def get_state_for(self, user):
81 return State(
82 names=self.storageProvider.list_names(self.categoryName) if user.can_read(self) else [],
83 canRead=user.can_read(self),
84 canWrite=user.can_write(self),
85 canDelete=user.can_delete(self),
86 canCreate=user.can_create(self),
87 )
89 def handle_request(self, req: gws.WebRequester, p: Request) -> Response:
90 state = self.get_state_for(req.user)
91 data = None
93 if p.verb == Verb.list:
94 if not state.canRead:
95 raise gws.ForbiddenError()
96 pass
98 if p.verb == Verb.read:
99 if not state.canRead or not p.entryName:
100 raise gws.ForbiddenError()
101 rec = self.storageProvider.read(self.categoryName, p.entryName)
102 if rec:
103 data = gws.lib.jsonx.from_string(rec.data)
105 if p.verb == Verb.write:
106 if not p.entryName or not p.entryData:
107 raise gws.ForbiddenError()
108 if p.entryName in state.names and not state.canWrite:
109 raise gws.ForbiddenError()
110 if p.entryName not in state.names and not state.canCreate:
111 raise gws.ForbiddenError()
113 d = gws.lib.jsonx.to_string(p.entryData)
114 self.storageProvider.write(self.categoryName, p.entryName, d, req.user.uid)
116 if p.verb == Verb.delete:
117 if not state.canDelete or not p.entryName:
118 raise gws.ForbiddenError()
119 self.storageProvider.delete(self.categoryName, p.entryName)
121 return Response(data=data, state=self.get_state_for(req.user))