Coverage for gws-app/gws/plugin/model_field/file/__init__.py: 0%

153 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1"""File field.""" 

2 

3from typing import Optional, cast 

4 

5import gws 

6import gws.base.database.model 

7import gws.base.model.field 

8import gws.lib.mime 

9import gws.lib.osx 

10import gws.lib.sa as sa 

11 

12gws.ext.new.modelField('file') 

13 

14 

15class Config(gws.base.model.field.Config): 

16 """Configuration for the file field.""" 

17 

18 contentColumn: str = '' 

19 """Column name for the file content, if stored in the database.""" 

20 pathColumn: str = '' 

21 """Column name for the file path, if stored in the filesystem.""" 

22 nameColumn: str = '' 

23 """Column name for the file name, if stored in the database or filesystem.""" 

24 

25 

26class Props(gws.base.model.field.Props): 

27 pass 

28 

29 

30class Cols(gws.Data): 

31 content: Optional[sa.Column] 

32 path: Optional[sa.Column] 

33 name: Optional[sa.Column] 

34 

35 

36class FileInputProps(gws.Data): 

37 content: bytes 

38 name: str 

39 

40 

41class ServerFileProps(gws.Data): 

42 downloadUrl: str 

43 extension: str 

44 label: str 

45 previewUrl: str 

46 size: int 

47 

48 

49class ClientFileProps(gws.Data): 

50 name: str 

51 content: bytes 

52 

53 

54class FileValue(gws.Data): 

55 content: bytes 

56 name: str 

57 path: str 

58 size: int 

59 

60 

61class Object(gws.base.model.field.Object): 

62 model: gws.DatabaseModel 

63 

64 attributeType = gws.AttributeType.file 

65 cols: Cols 

66 

67 def __getstate__(self): 

68 return gws.u.omit(vars(self), 'cols') 

69 

70 def post_configure(self): 

71 self.configure_columns() 

72 

73 def activate(self): 

74 self.configure_columns() 

75 

76 def configure_columns(self): 

77 model = cast(gws.base.database.model.Object, self.model) 

78 

79 self.cols = Cols() 

80 

81 p = self.cfg('contentColumn') 

82 self.cols.content = model.column(p) if p else None 

83 

84 p = self.cfg('pathColumn') 

85 self.cols.path = model.column(p) if p else None 

86 

87 p = self.cfg('nameColumn') 

88 self.cols.name = model.column(p) if p else None 

89 

90 if self.cols.content is None and self.cols.path is None: 

91 raise gws.ConfigurationError('contentColumn or pathColumn must be set') 

92 

93 if not self.model.uidName: 

94 raise gws.ConfigurationError('file fields require a primary key') 

95 

96 def configure_widget(self): 

97 if not super().configure_widget(): 

98 self.widget = self.root.create_shared(gws.ext.object.modelWidget, type='file') 

99 return True 

100 

101 ## 

102 

103 def before_select(self, mc): 

104 mc.dbSelect.columns.extend(self.select_columns(False, mc)) 

105 

106 def after_select(self, features, mc): 

107 for feature in features: 

108 self.from_record(feature, mc) 

109 

110 def before_create(self, feature, mc): 

111 self.to_record(feature, mc) 

112 

113 def before_update(self, feature, mc): 

114 self.to_record(feature, mc) 

115 

116 def from_record(self, feature, mc): 

117 feature.set(self.name, self.load_value(feature.record.attributes, mc)) 

118 

119 def to_record(self, feature, mc): 

120 if not mc.user.can_write(self): 

121 return 

122 

123 # @TODO store in the filesystem 

124 

125 fv = cast(FileValue, feature.get(self.name)) 

126 if fv: 

127 if self.cols.content is not None: 

128 feature.record.attributes[self.cols.content.name] = fv.content 

129 if self.cols.name is not None: 

130 feature.record.attributes[self.cols.name.name] = fv.name 

131 

132 # @TODO merge with scalar_field? 

133 

134 def from_props(self, feature, mc): 

135 value = feature.props.attributes.get(self.name) 

136 if value is not None: 

137 value = self.prop_to_python(feature, value, mc) 

138 if value is not None: 

139 feature.set(self.name, value) 

140 

141 def to_props(self, feature, mc): 

142 if not mc.user.can_read(self): 

143 return 

144 value = feature.get(self.name) 

145 if value is not None: 

146 value = self.python_to_prop(feature, value, mc) 

147 if value is not None: 

148 feature.props.attributes[self.name] = value 

149 

150 ## 

151 

152 def prop_to_python(self, feature, value, mc) -> FileValue: 

153 try: 

154 return FileValue( 

155 content=gws.u.get(value, 'content'), 

156 name=gws.u.get(value, 'name'), 

157 ) 

158 except ValueError: 

159 return gws.ErrorValue 

160 

161 def python_to_prop(self, feature, value, mc) -> ServerFileProps: 

162 fv = cast(FileValue, value) 

163 

164 mime = self.get_mime_type(fv) 

165 ext = gws.lib.mime.extension_for(mime) 

166 

167 p = ServerFileProps( 

168 # @TODO use a template 

169 label=fv.name or '', 

170 extension=ext, 

171 size=fv.size or 0, 

172 previewUrl='', 

173 downloadUrl='', 

174 ) 

175 

176 name = fv.name or f'gws.{ext}' 

177 

178 url_args = dict( 

179 projectUid=mc.project.uid, 

180 modelUid=self.model.uid, 

181 fieldName=self.name, 

182 featureUid=feature.uid(), 

183 ) 

184 

185 if mime.startswith('image'): 

186 p.previewUrl = gws.u.action_url_path('webFile', preview=1, **url_args) + '/' + name 

187 

188 p.downloadUrl = gws.u.action_url_path('webFile', **url_args) + '/' + name 

189 

190 return p 

191 

192 ## 

193 

194 def get_mime_type(self, fv: FileValue) -> str: 

195 if fv.path: 

196 return gws.lib.mime.for_path(fv.path) 

197 if fv.name: 

198 return gws.lib.mime.for_path(fv.name) 

199 # @TODO guess mime from content? 

200 return gws.lib.mime.TXT 

201 

202 def handle_web_file_request(self, feature_uid: str, preview: bool, mc: gws.ModelContext) -> Optional[gws.ContentResponse]: 

203 model = cast(gws.DatabaseModel, self.model) 

204 

205 sql = sa.select(*self.select_columns(True, mc)).where(model.uid_column().__eq__(feature_uid)) 

206 

207 with self.model.db.connect() as conn: 

208 rs = list(conn.execute(sql)) 

209 if not rs: 

210 return 

211 

212 for row in rs: 

213 fv = self.load_value(gws.u.to_dict(row), mc) 

214 return gws.ContentResponse( 

215 asAttachment=not preview, 

216 attachmentName=fv.name, 

217 content=fv.content, 

218 mime=self.get_mime_type(fv), 

219 ) 

220 

221 ## 

222 

223 def select_columns(self, with_content, mc): 

224 cs = [] 

225 

226 if self.cols.content is not None: 

227 cs.append(sa.func.length(self.cols.content).label(f'{self.name}_length')) 

228 if with_content: 

229 cs.append(self.cols.content) 

230 

231 if self.cols.path is not None: 

232 cs.append(self.cols.path) 

233 

234 if self.cols.name is not None: 

235 cs.append(self.cols.name) 

236 

237 return cs 

238 

239 def load_value(self, attributes: dict, mc) -> FileValue: 

240 d = {} 

241 

242 if self.cols.content is not None: 

243 d['size'] = attributes.get(f'{self.name}_length') 

244 d['content'] = attributes.get(self.cols.content.name) 

245 if self.cols.path is not None: 

246 d['path'] = attributes.get(self.cols.path.name) 

247 if self.cols.name is not None: 

248 d['name'] = attributes.get(self.cols.name.name) 

249 

250 if d: 

251 return FileValue(**d)