Coverage for gws-app / gws / plugin / model_field / file / __init__.py: 0%

154 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1"""File field.""" 

2 

3from typing import Optional, cast 

4 

5import gws 

6import gws.base.database.model 

7import gws.base.model.field 

8import gws.lib.mime 

9import gws.lib.osx 

10import gws.lib.sa as sa 

11 

12gws.ext.new.modelField('file') 

13 

14 

15class Config(gws.base.model.field.Config): 

16 """Configuration for the file field.""" 

17 

18 contentColumn: str = '' 

19 """Column name for the file content, if stored in the database.""" 

20 pathColumn: str = '' 

21 """Column name for the file path, if stored in the filesystem.""" 

22 nameColumn: str = '' 

23 """Column name for the file name, if stored in the database or filesystem.""" 

24 

25 

26class Props(gws.base.model.field.Props): 

27 pass 

28 

29 

30class FileInputProps(gws.Data): 

31 content: bytes 

32 name: str 

33 

34 

35class ServerFileProps(gws.Data): 

36 downloadUrl: str 

37 extension: str 

38 label: str 

39 previewUrl: str 

40 size: int 

41 

42 

43class ClientFileProps(gws.Data): 

44 name: str 

45 content: bytes 

46 

47 

48class FileValue(gws.Data): 

49 content: bytes 

50 name: str 

51 path: str 

52 size: int 

53 

54 

55class Object(gws.base.model.field.Object): 

56 model: gws.DatabaseModel 

57 

58 attributeType = gws.AttributeType.file 

59 

60 contentColumn: Optional[sa.Column] = None 

61 pathColumn: Optional[sa.Column] = None 

62 nameColumn: Optional[sa.Column] = None 

63 

64 def __getstate__(self): 

65 return gws.u.omit(vars(self), 'cols') 

66 

67 def post_configure(self): 

68 self.configure_columns() 

69 

70 def activate(self): 

71 self.configure_columns() 

72 

73 def configure_columns(self): 

74 model = cast(gws.base.database.model.Object, self.model) 

75 

76 

77 p = self.cfg('contentColumn') 

78 self.contentColumn = model.column(p) if p else None 

79 

80 p = self.cfg('pathColumn') 

81 self.pathColumn = model.column(p) if p else None 

82 

83 p = self.cfg('nameColumn') 

84 self.nameColumn = model.column(p) if p else None 

85 

86 if self.contentColumn is None and self.pathColumn is None: 

87 raise gws.ConfigurationError('contentColumn or pathColumn must be set') 

88 

89 if not self.model.uidName: 

90 raise gws.ConfigurationError('file fields require a primary key') 

91 

92 def configure_widget(self): 

93 if not super().configure_widget(): 

94 self.widget = self.root.create_shared(gws.ext.object.modelWidget, type='file') 

95 return True 

96 

97 ## 

98 

99 def before_select(self, mc): 

100 mc.dbSelect.columns.extend(self.select_columns(mc)) 

101 

102 def after_select(self, features, mc): 

103 for feature in features: 

104 self.from_record(feature, mc) 

105 

106 def before_create(self, feature, mc): 

107 self.to_record(feature, mc) 

108 

109 def before_update(self, feature, mc): 

110 self.to_record(feature, mc) 

111 

112 def from_record(self, feature, mc): 

113 feature.set(self.name, self.load_value(feature.record.attributes, mc)) 

114 

115 def to_record(self, feature, mc): 

116 if not mc.user.can_write(self): 

117 return 

118 

119 # @TODO store in the filesystem 

120 

121 fv = cast(FileValue, feature.get(self.name)) 

122 if not fv: 

123 return 

124 if self.contentColumn is not None: 

125 feature.record.attributes[self.contentColumn.name] = fv.content 

126 if self.nameColumn is not None: 

127 feature.record.attributes[self.nameColumn.name] = fv.name 

128 

129 # @TODO merge with scalar_field? 

130 

131 def from_props(self, feature, mc): 

132 value = feature.props.attributes.get(self.name) 

133 if value is not None: 

134 value = self.prop_to_python(feature, value, mc) 

135 if value is not None: 

136 feature.set(self.name, value) 

137 

138 def to_props(self, feature, mc): 

139 if not mc.user.can_read(self): 

140 return 

141 value = feature.get(self.name) 

142 if value is not None: 

143 value = self.python_to_prop(feature, value, mc) 

144 if value is not None: 

145 feature.props.attributes[self.name] = value 

146 

147 ## 

148 

149 def prop_to_python(self, feature, value, mc) -> FileValue: 

150 try: 

151 return FileValue( 

152 content=gws.u.get(value, 'content'), 

153 name=gws.u.get(value, 'name'), 

154 ) 

155 except ValueError: 

156 return gws.ErrorValue 

157 

158 def python_to_prop(self, feature, value, mc) -> ServerFileProps: 

159 fv = cast(FileValue, value) 

160 

161 mime = self.get_mime_type(fv) 

162 ext = gws.lib.mime.extension_for(mime) 

163 

164 p = ServerFileProps( 

165 # @TODO use a template 

166 label=fv.name or '', 

167 extension=ext, 

168 size=fv.size or 0, 

169 previewUrl='', 

170 downloadUrl='', 

171 ) 

172 

173 name = fv.name or f'gws.{ext}' 

174 

175 url_args = dict( 

176 projectUid=mc.project.uid, 

177 modelUid=self.model.uid, 

178 fieldName=self.name, 

179 featureUid=feature.uid(), 

180 ) 

181 

182 if mime.startswith('image'): 

183 p.previewUrl = gws.u.action_url_path('webFile', preview=1, **url_args) + '/' + name 

184 

185 p.downloadUrl = gws.u.action_url_path('webFile', **url_args) + '/' + name 

186 

187 return p 

188 

189 ## 

190 

191 def get_mime_type(self, fv: FileValue) -> str: 

192 if fv.path: 

193 return gws.lib.mime.for_path(fv.path) 

194 if fv.name: 

195 return gws.lib.mime.for_path(fv.name) 

196 # @TODO guess mime from content? 

197 return gws.lib.mime.TXT 

198 

199 def handle_web_file_request(self, feature_uid: str, preview: bool, mc: gws.ModelContext) -> Optional[gws.ContentResponse]: 

200 model = cast(gws.DatabaseModel, self.model) 

201 

202 cols = self.select_columns(mc) 

203 if self.contentColumn is not None: 

204 cols.append(self.contentColumn) 

205 

206 sql = sa.select(*cols).where(model.uid_column().__eq__(feature_uid)) 

207 

208 with self.model.db.connect() as conn: 

209 rs = list(conn.execute(sql)) 

210 if not rs: 

211 return 

212 

213 for row in rs: 

214 fv = self.load_value(gws.u.to_dict(row), mc) 

215 if not fv: 

216 return 

217 return gws.ContentResponse( 

218 content=fv.content, 

219 contentFilename=None if preview else fv.name, 

220 mime=self.get_mime_type(fv), 

221 ) 

222 

223 ## 

224 

225 def select_columns(self, mc): 

226 cs = [] 

227 

228 if self.contentColumn is not None: 

229 cs.append(sa.func.length(self.contentColumn).label(f'{self.name}_length')) 

230 if self.pathColumn is not None: 

231 cs.append(self.pathColumn) 

232 if self.nameColumn is not None: 

233 cs.append(self.nameColumn) 

234 

235 return cs 

236 

237 def load_value(self, attributes: dict, mc) -> Optional[FileValue]: 

238 d = {} 

239 

240 if self.contentColumn is not None: 

241 d['size'] = attributes.get(f'{self.name}_length') 

242 d['content'] = attributes.get(self.contentColumn.name) 

243 if self.pathColumn is not None: 

244 d['path'] = attributes.get(self.pathColumn.name) 

245 if self.nameColumn is not None: 

246 d['name'] = attributes.get(self.nameColumn.name) 

247 

248 if d: 

249 return FileValue(**d)