Coverage for gws-app/gws/lib/zipx/__init__.py: 87%

89 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1"""Zipfile wrappers.""" 

2 

3import io 

4import os 

5import shutil 

6import zipfile 

7 

8import gws 

9 

10 

11class Error(gws.Error): 

12 pass 

13 

14 

15def zip_to_path(path: str, *sources, flat: bool = False) -> int: 

16 """Create a zip archive in a file. 

17 

18 Args: 

19 path: Path to the zip archive. 

20 sources: Paths or dicts to zip. 

21 flat: If ``True`` base names are being kept in zip archive, 

22 else whole paths are being kept in zip archive. Default is ``False`` 

23 

24 Returns: 

25 The amount of files in the zip archive. 

26 """ 

27 

28 return _zip(path, sources, flat) 

29 

30 

31def zip_to_bytes(*sources, flat: bool = False) -> bytes: 

32 """Create a zip archive in memory. 

33 

34 Args: 

35 sources: Paths or dicts to zip. 

36 flat: If ``True`` only base names will be returned, 

37 else the whole paths will be returned. Default is ``False``. 

38 

39 Returns: 

40 The names of the file paths encoded in bytes. 

41 """ 

42 

43 with io.BytesIO() as fp: 

44 cnt = _zip(fp, sources, flat) 

45 return fp.getvalue() if cnt else b'' 

46 

47 

48def unzip_path(path: str, target_dir: str, flat: bool = False) -> int: 

49 """Unpack a zip archive into a directory. 

50 

51 Args: 

52 path: Path to the zip archive. 

53 target_dir: Path to the target directory. 

54 flat: If ``True`` omit path and consider only base name of files in the zip archive, 

55 else complete paths are considered of files in the zip archive. Default is ``False``. 

56 

57 Returns: 

58 The amount of unzipped files. 

59 """ 

60 

61 return _unzip(path, target_dir, None, flat) 

62 

63 

64def unzip_bytes(source: bytes, target_dir: str, flat: bool = False) -> int: 

65 """Unpack a zip archive in memory into a directory. 

66 

67 Args: 

68 source: Path to the zip archive. 

69 target_dir: Path to the target directory. 

70 flat: If ``True`` omit path and consider only base name of files in the zip archive, 

71 else complete paths are considered of files in the zip archive. Default is ``False``. 

72 

73 Returns: 

74 The amount of unzipped files. 

75 """ 

76 

77 with io.BytesIO(source) as fp: 

78 return _unzip(fp, target_dir, None, flat) 

79 

80 

81def unzip_path_to_dict(path: str, flat: bool = False) -> dict: 

82 """Unpack a zip archive into a dict. 

83 

84 Args: 

85 path: Path to the zip archive. 

86 flat: If ``True`` then the dictionary contains the base names of the unzipped files, 

87 else it contains the whole path. Default is ``False``. 

88 

89 Returns: 

90 A dictionary containing all the file paths or base names. 

91 """ 

92 

93 dct = {} 

94 _unzip(path, None, dct, flat) 

95 return dct 

96 

97 

98def unzip_bytes_to_dict(source: bytes, flat: bool = False) -> dict: 

99 """Unpack a zip archive in memory into a dict. 

100 

101 Args: 

102 source: Path to zip archive. 

103 flat: If ``True`` then the dictionary contains the base names of the unzipped files, 

104 else it contains the whole path. Default is ``False``. 

105 

106 Returns: 

107 A dictionary containing all the file paths or base names. 

108 """ 

109 

110 with io.BytesIO(source) as fp: 

111 dct = {} 

112 _unzip(fp, None, dct, flat) 

113 return dct 

114 

115 

116## 

117 

118def _zip(target, sources, flat): 

119 files = [] 

120 dct = {} 

121 

122 for src in sources: 

123 

124 if isinstance(src, dict): 

125 for name, content in src.items(): 

126 dct[os.path.basename(name) if flat else name] = content 

127 continue 

128 

129 if os.path.isdir(src): 

130 for p in _scan_dir(src): 

131 if flat: 

132 files.append([p, os.path.basename(p)]) 

133 else: 

134 files.append([p, os.path.relpath(p, src)]) 

135 continue 

136 

137 if os.path.isfile(src): 

138 if flat: 

139 files.append([src, os.path.basename(src)]) 

140 else: 

141 files.append([src, src]) 

142 continue 

143 

144 raise Error(f'zip: invalid argument: {src!r}') 

145 

146 if not files and not dct: 

147 return 0 

148 

149 cnt = 0 

150 

151 with zipfile.ZipFile(target, 'w', compression=zipfile.ZIP_DEFLATED) as zf: 

152 for filename, arcname in files: 

153 zf.write(filename, arcname) 

154 cnt += 1 

155 for name, content in dct.items(): 

156 zf.writestr(name, content) 

157 cnt += 1 

158 

159 return cnt 

160 

161 

162def _unzip(source, target_dir, target_dict, flat): 

163 cnt = 0 

164 

165 with zipfile.ZipFile(source, 'r') as zf: 

166 for zi in zf.infolist(): 

167 if zi.is_dir(): 

168 continue 

169 

170 path = zi.filename 

171 base = os.path.basename(path) 

172 

173 if path.startswith(('/', '.')) or '..' in path or not base: 

174 gws.log.warning(f'unzip: invalid file name: {path!r}') 

175 continue 

176 

177 if target_dict is not None: 

178 with zf.open(path) as src: 

179 target_dict[base if flat else path] = src.read() 

180 continue 

181 

182 if flat: 

183 dst = os.path.join(target_dir, base) 

184 else: 

185 dst = os.path.join(target_dir, *path.split('/')) 

186 os.makedirs(os.path.dirname(dst), exist_ok=True) 

187 

188 with zf.open(path) as src, open(dst, 'wb') as fp: 

189 shutil.copyfileobj(src, fp) 

190 cnt += 1 

191 

192 return cnt 

193 

194 

195def _scan_dir(source_dir): 

196 paths = [] 

197 

198 for de in os.scandir(source_dir): 

199 if de.is_file(): 

200 paths.append(de.path) 

201 elif de.is_dir(): 

202 paths.extend(_scan_dir(de.path)) 

203 

204 return paths