Coverage for gws-app / gws / plugin / model_field / file / __init__.py: 0%
154 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1"""File field."""
3from typing import Optional, cast
5import gws
6import gws.base.database.model
7import gws.base.model.field
8import gws.lib.mime
9import gws.lib.osx
10import gws.lib.sa as sa
12gws.ext.new.modelField('file')
15class Config(gws.base.model.field.Config):
16 """Configuration for the file field."""
18 contentColumn: str = ''
19 """Column name for the file content, if stored in the database."""
20 pathColumn: str = ''
21 """Column name for the file path, if stored in the filesystem."""
22 nameColumn: str = ''
23 """Column name for the file name, if stored in the database or filesystem."""
26class Props(gws.base.model.field.Props):
27 pass
30class FileInputProps(gws.Data):
31 content: bytes
32 name: str
35class ServerFileProps(gws.Data):
36 downloadUrl: str
37 extension: str
38 label: str
39 previewUrl: str
40 size: int
43class ClientFileProps(gws.Data):
44 name: str
45 content: bytes
48class FileValue(gws.Data):
49 content: bytes
50 name: str
51 path: str
52 size: int
55class Object(gws.base.model.field.Object):
56 model: gws.DatabaseModel
58 attributeType = gws.AttributeType.file
60 contentColumn: Optional[sa.Column] = None
61 pathColumn: Optional[sa.Column] = None
62 nameColumn: Optional[sa.Column] = None
64 def __getstate__(self):
65 return gws.u.omit(vars(self), 'cols')
67 def post_configure(self):
68 self.configure_columns()
70 def activate(self):
71 self.configure_columns()
73 def configure_columns(self):
74 model = cast(gws.base.database.model.Object, self.model)
77 p = self.cfg('contentColumn')
78 self.contentColumn = model.column(p) if p else None
80 p = self.cfg('pathColumn')
81 self.pathColumn = model.column(p) if p else None
83 p = self.cfg('nameColumn')
84 self.nameColumn = model.column(p) if p else None
86 if self.contentColumn is None and self.pathColumn is None:
87 raise gws.ConfigurationError('contentColumn or pathColumn must be set')
89 if not self.model.uidName:
90 raise gws.ConfigurationError('file fields require a primary key')
92 def configure_widget(self):
93 if not super().configure_widget():
94 self.widget = self.root.create_shared(gws.ext.object.modelWidget, type='file')
95 return True
97 ##
99 def before_select(self, mc):
100 mc.dbSelect.columns.extend(self.select_columns(mc))
102 def after_select(self, features, mc):
103 for feature in features:
104 self.from_record(feature, mc)
106 def before_create(self, feature, mc):
107 self.to_record(feature, mc)
109 def before_update(self, feature, mc):
110 self.to_record(feature, mc)
112 def from_record(self, feature, mc):
113 feature.set(self.name, self.load_value(feature.record.attributes, mc))
115 def to_record(self, feature, mc):
116 if not mc.user.can_write(self):
117 return
119 # @TODO store in the filesystem
121 fv = cast(FileValue, feature.get(self.name))
122 if not fv:
123 return
124 if self.contentColumn is not None:
125 feature.record.attributes[self.contentColumn.name] = fv.content
126 if self.nameColumn is not None:
127 feature.record.attributes[self.nameColumn.name] = fv.name
129 # @TODO merge with scalar_field?
131 def from_props(self, feature, mc):
132 value = feature.props.attributes.get(self.name)
133 if value is not None:
134 value = self.prop_to_python(feature, value, mc)
135 if value is not None:
136 feature.set(self.name, value)
138 def to_props(self, feature, mc):
139 if not mc.user.can_read(self):
140 return
141 value = feature.get(self.name)
142 if value is not None:
143 value = self.python_to_prop(feature, value, mc)
144 if value is not None:
145 feature.props.attributes[self.name] = value
147 ##
149 def prop_to_python(self, feature, value, mc) -> FileValue:
150 try:
151 return FileValue(
152 content=gws.u.get(value, 'content'),
153 name=gws.u.get(value, 'name'),
154 )
155 except ValueError:
156 return gws.ErrorValue
158 def python_to_prop(self, feature, value, mc) -> ServerFileProps:
159 fv = cast(FileValue, value)
161 mime = self.get_mime_type(fv)
162 ext = gws.lib.mime.extension_for(mime)
164 p = ServerFileProps(
165 # @TODO use a template
166 label=fv.name or '',
167 extension=ext,
168 size=fv.size or 0,
169 previewUrl='',
170 downloadUrl='',
171 )
173 name = fv.name or f'gws.{ext}'
175 url_args = dict(
176 projectUid=mc.project.uid,
177 modelUid=self.model.uid,
178 fieldName=self.name,
179 featureUid=feature.uid(),
180 )
182 if mime.startswith('image'):
183 p.previewUrl = gws.u.action_url_path('webFile', preview=1, **url_args) + '/' + name
185 p.downloadUrl = gws.u.action_url_path('webFile', **url_args) + '/' + name
187 return p
189 ##
191 def get_mime_type(self, fv: FileValue) -> str:
192 if fv.path:
193 return gws.lib.mime.for_path(fv.path)
194 if fv.name:
195 return gws.lib.mime.for_path(fv.name)
196 # @TODO guess mime from content?
197 return gws.lib.mime.TXT
199 def handle_web_file_request(self, feature_uid: str, preview: bool, mc: gws.ModelContext) -> Optional[gws.ContentResponse]:
200 model = cast(gws.DatabaseModel, self.model)
202 cols = self.select_columns(mc)
203 if self.contentColumn is not None:
204 cols.append(self.contentColumn)
206 sql = sa.select(*cols).where(model.uid_column().__eq__(feature_uid))
208 with self.model.db.connect() as conn:
209 rs = list(conn.execute(sql))
210 if not rs:
211 return
213 for row in rs:
214 fv = self.load_value(gws.u.to_dict(row), mc)
215 if not fv:
216 return
217 return gws.ContentResponse(
218 content=fv.content,
219 contentFilename=None if preview else fv.name,
220 mime=self.get_mime_type(fv),
221 )
223 ##
225 def select_columns(self, mc):
226 cs = []
228 if self.contentColumn is not None:
229 cs.append(sa.func.length(self.contentColumn).label(f'{self.name}_length'))
230 if self.pathColumn is not None:
231 cs.append(self.pathColumn)
232 if self.nameColumn is not None:
233 cs.append(self.nameColumn)
235 return cs
237 def load_value(self, attributes: dict, mc) -> Optional[FileValue]:
238 d = {}
240 if self.contentColumn is not None:
241 d['size'] = attributes.get(f'{self.name}_length')
242 d['content'] = attributes.get(self.contentColumn.name)
243 if self.pathColumn is not None:
244 d['path'] = attributes.get(self.pathColumn.name)
245 if self.nameColumn is not None:
246 d['name'] = attributes.get(self.nameColumn.name)
248 if d:
249 return FileValue(**d)