Coverage for gws-app/gws/lib/cli/__init__.py: 65%

162 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1"""Utilities for CLI commands.""" 

2 

3import re 

4import os 

5import shutil 

6import sys 

7import subprocess 

8import time 

9import math 

10import traceback 

11 

12SCRIPT_NAME = '' 

13 

14_COLOR = { 

15 'black': '\x1b[30m', 

16 'red': '\x1b[31m', 

17 'green': '\x1b[32m', 

18 'yellow': '\x1b[33m', 

19 'blue': '\x1b[34m', 

20 'magenta': '\x1b[35m', 

21 'cyan': '\x1b[36m', 

22 'white': '\x1b[37m', 

23 'reset': '\x1b[0m', 

24} 

25 

26 

27def cprint(clr, msg): 

28 if SCRIPT_NAME: 

29 msg = '[' + SCRIPT_NAME + '] ' + msg 

30 if clr and sys.stdout.isatty(): 

31 msg = _COLOR[clr] + msg + _COLOR['reset'] 

32 sys.stdout.write(msg + '\n') 

33 sys.stdout.flush() 

34 

35 

36def error(msg): 

37 cprint('red', msg) 

38 

39 

40def fatal(msg): 

41 cprint('red', msg) 

42 sys.exit(1) 

43 

44 

45def warning(msg): 

46 cprint('yellow', msg) 

47 

48 

49def info(msg): 

50 cprint('cyan', msg) 

51 

52 

53## 

54 

55def run(cmd): 

56 if isinstance(cmd, list): 

57 cmd = ' '.join(cmd) 

58 cmd = re.sub(r'\s+', ' ', cmd.strip()) 

59 info(f'> {cmd}') 

60 res = subprocess.run(cmd, shell=True, capture_output=False) 

61 if res.returncode: 

62 fatal(f'COMMAND FAILED, code {res.returncode}') 

63 

64 

65def exec(cmd): 

66 try: 

67 return ( 

68 subprocess 

69 .run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) 

70 .stdout.decode('utf8').strip() 

71 ) 

72 except Exception as exc: 

73 return f'> {cmd} FAILED: {exc}' 

74 

75 

76def find_dirs(dirname): 

77 if not os.path.isdir(dirname): 

78 return 

79 

80 de: os.DirEntry 

81 for de in os.scandir(dirname): 

82 if de.name.startswith('.'): 

83 continue 

84 if de.is_dir(): 

85 yield de.path 

86 

87 

88def find_files(dirname, pattern=None, deep=True): 

89 if not os.path.isdir(dirname): 

90 return 

91 

92 de: os.DirEntry 

93 for de in os.scandir(dirname): 

94 if de.name.startswith('.'): 

95 continue 

96 if de.is_dir() and deep: 

97 yield from find_files(de.path, pattern) 

98 continue 

99 if de.is_file() and (pattern is None or re.search(pattern, de.path)): 

100 yield de.path 

101 

102 

103def ensure_dir(path, clear=False): 

104 os.makedirs(path, exist_ok=True) 

105 if clear: 

106 shutil.rmtree(path) 

107 return path 

108 

109 

110def read_file(path): 

111 with open(path, 'rt', encoding='utf8') as fp: 

112 return fp.read().strip() 

113 

114 

115def write_file(path, text): 

116 with open(path, 'wt', encoding='utf8') as fp: 

117 fp.write(text) 

118 

119 

120def parse_args(argv): 

121 args = {} 

122 opt = None 

123 n = 0 

124 

125 for a in argv: 

126 if a == '-': 

127 args['_rest'] = [] 

128 elif '_rest' in args: 

129 args['_rest'].append(a) 

130 elif a.startswith('--'): 

131 opt = a[2:] 

132 args[opt] = True 

133 elif a.startswith('-'): 

134 opt = a[1:] 

135 args[opt] = True 

136 elif opt: 

137 args[opt] = a 

138 opt = None 

139 else: 

140 args[n] = a 

141 n += 1 

142 

143 return args 

144 

145 

146def main(name, main_fn, usage): 

147 global SCRIPT_NAME 

148 

149 SCRIPT_NAME = name 

150 

151 args = parse_args(sys.argv) 

152 if not args or 'h' in args or 'help' in args: 

153 print('\n' + usage.strip() + '\n') 

154 sys.exit(0) 

155 

156 try: 

157 sys.exit(main_fn(args)) 

158 except KeyboardInterrupt: 

159 pass 

160 except Exception as exc: 

161 error('INTERNAL ERROR') 

162 error(traceback.format_exc()) 

163 

164 

165def text_table(data, header=None, delim=' | '): 

166 """Format a list of dicts as a text-mode table.""" 

167 

168 data = list(data) 

169 

170 if not data: 

171 return '' 

172 

173 is_dict = isinstance(data[0], dict) 

174 

175 print_header = header is not None 

176 if header is None or header == 'auto': 

177 header = data[0].keys() if is_dict else list(range(len(data[0]))) 

178 

179 widths = [len(h) if print_header else 1 for h in header] 

180 

181 def get(d, h): 

182 if is_dict: 

183 return d.get(h, '') 

184 try: 

185 return d[h] 

186 except IndexError: 

187 return '' 

188 

189 for d in data: 

190 widths = [ 

191 max(a, b) 

192 for a, b in zip( 

193 widths, 

194 [len(str(get(d, h))) for h in header] 

195 ) 

196 ] 

197 

198 def field(n, v): 

199 if isinstance(v, (int, float)): 

200 return str(v).rjust(widths[n]) 

201 return str(v).ljust(widths[n]) 

202 

203 rows = [] 

204 

205 if print_header: 

206 hdr = delim.join(field(n, h) for n, h in enumerate(header)) 

207 rows.append(hdr) 

208 rows.append('-' * len(hdr)) 

209 

210 for d in data: 

211 rows.append(delim.join(field(n, get(d, h)) for n, h in enumerate(header))) 

212 

213 return '\n'.join(rows) 

214 

215 

216class ProgressIndicator: 

217 def __init__(self, title, total=0, resolution=10): 

218 self.resolution = resolution 

219 self.title = title 

220 self.total = total 

221 self.progress = 0 

222 self.lastd = 0 

223 self.starttime = 0 

224 

225 def __enter__(self): 

226 self.log(f'START ({self.total})' if self.total else 'START') 

227 self.starttime = time.time() 

228 return self 

229 

230 def __exit__(self, exc_type, exc_val, exc_tb): 

231 if not exc_type: 

232 ts = time.time() - self.starttime 

233 self.log(f'END ({ts:.2f} sec)') 

234 

235 def update(self, add=1): 

236 if not self.total: 

237 return 

238 self.progress += add 

239 p = math.floor(self.progress * 100.0 / self.total) 

240 if p > 100: 

241 p = 100 

242 d = round(p / self.resolution) * self.resolution 

243 if d > self.lastd: 

244 self.log(f'{d}%') 

245 self.lastd = d 

246 

247 def log(self, s): 

248 info(f'{self.title}: {s}')