Coverage for gws-app/gws/lib/cli/__init__.py: 65%
162 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
1"""Utilities for CLI commands."""
3import re
4import os
5import shutil
6import sys
7import subprocess
8import time
9import math
10import traceback
12SCRIPT_NAME = ''
14_COLOR = {
15 'black': '\x1b[30m',
16 'red': '\x1b[31m',
17 'green': '\x1b[32m',
18 'yellow': '\x1b[33m',
19 'blue': '\x1b[34m',
20 'magenta': '\x1b[35m',
21 'cyan': '\x1b[36m',
22 'white': '\x1b[37m',
23 'reset': '\x1b[0m',
24}
27def cprint(clr, msg):
28 if SCRIPT_NAME:
29 msg = '[' + SCRIPT_NAME + '] ' + msg
30 if clr and sys.stdout.isatty():
31 msg = _COLOR[clr] + msg + _COLOR['reset']
32 sys.stdout.write(msg + '\n')
33 sys.stdout.flush()
36def error(msg):
37 cprint('red', msg)
40def fatal(msg):
41 cprint('red', msg)
42 sys.exit(1)
45def warning(msg):
46 cprint('yellow', msg)
49def info(msg):
50 cprint('cyan', msg)
53##
55def run(cmd):
56 if isinstance(cmd, list):
57 cmd = ' '.join(cmd)
58 cmd = re.sub(r'\s+', ' ', cmd.strip())
59 info(f'> {cmd}')
60 res = subprocess.run(cmd, shell=True, capture_output=False)
61 if res.returncode:
62 fatal(f'COMMAND FAILED, code {res.returncode}')
65def exec(cmd):
66 try:
67 return (
68 subprocess
69 .run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
70 .stdout.decode('utf8').strip()
71 )
72 except Exception as exc:
73 return f'> {cmd} FAILED: {exc}'
76def find_dirs(dirname):
77 if not os.path.isdir(dirname):
78 return
80 de: os.DirEntry
81 for de in os.scandir(dirname):
82 if de.name.startswith('.'):
83 continue
84 if de.is_dir():
85 yield de.path
88def find_files(dirname, pattern=None, deep=True):
89 if not os.path.isdir(dirname):
90 return
92 de: os.DirEntry
93 for de in os.scandir(dirname):
94 if de.name.startswith('.'):
95 continue
96 if de.is_dir() and deep:
97 yield from find_files(de.path, pattern)
98 continue
99 if de.is_file() and (pattern is None or re.search(pattern, de.path)):
100 yield de.path
103def ensure_dir(path, clear=False):
104 os.makedirs(path, exist_ok=True)
105 if clear:
106 shutil.rmtree(path)
107 return path
110def read_file(path):
111 with open(path, 'rt', encoding='utf8') as fp:
112 return fp.read().strip()
115def write_file(path, text):
116 with open(path, 'wt', encoding='utf8') as fp:
117 fp.write(text)
120def parse_args(argv):
121 args = {}
122 opt = None
123 n = 0
125 for a in argv:
126 if a == '-':
127 args['_rest'] = []
128 elif '_rest' in args:
129 args['_rest'].append(a)
130 elif a.startswith('--'):
131 opt = a[2:]
132 args[opt] = True
133 elif a.startswith('-'):
134 opt = a[1:]
135 args[opt] = True
136 elif opt:
137 args[opt] = a
138 opt = None
139 else:
140 args[n] = a
141 n += 1
143 return args
146def main(name, main_fn, usage):
147 global SCRIPT_NAME
149 SCRIPT_NAME = name
151 args = parse_args(sys.argv)
152 if not args or 'h' in args or 'help' in args:
153 print('\n' + usage.strip() + '\n')
154 sys.exit(0)
156 try:
157 sys.exit(main_fn(args))
158 except KeyboardInterrupt:
159 pass
160 except Exception as exc:
161 error('INTERNAL ERROR')
162 error(traceback.format_exc())
165def text_table(data, header=None, delim=' | '):
166 """Format a list of dicts as a text-mode table."""
168 data = list(data)
170 if not data:
171 return ''
173 is_dict = isinstance(data[0], dict)
175 print_header = header is not None
176 if header is None or header == 'auto':
177 header = data[0].keys() if is_dict else list(range(len(data[0])))
179 widths = [len(h) if print_header else 1 for h in header]
181 def get(d, h):
182 if is_dict:
183 return d.get(h, '')
184 try:
185 return d[h]
186 except IndexError:
187 return ''
189 for d in data:
190 widths = [
191 max(a, b)
192 for a, b in zip(
193 widths,
194 [len(str(get(d, h))) for h in header]
195 )
196 ]
198 def field(n, v):
199 if isinstance(v, (int, float)):
200 return str(v).rjust(widths[n])
201 return str(v).ljust(widths[n])
203 rows = []
205 if print_header:
206 hdr = delim.join(field(n, h) for n, h in enumerate(header))
207 rows.append(hdr)
208 rows.append('-' * len(hdr))
210 for d in data:
211 rows.append(delim.join(field(n, get(d, h)) for n, h in enumerate(header)))
213 return '\n'.join(rows)
216class ProgressIndicator:
217 def __init__(self, title, total=0, resolution=10):
218 self.resolution = resolution
219 self.title = title
220 self.total = total
221 self.progress = 0
222 self.lastd = 0
223 self.starttime = 0
225 def __enter__(self):
226 self.log(f'START ({self.total})' if self.total else 'START')
227 self.starttime = time.time()
228 return self
230 def __exit__(self, exc_type, exc_val, exc_tb):
231 if not exc_type:
232 ts = time.time() - self.starttime
233 self.log(f'END ({ts:.2f} sec)')
235 def update(self, add=1):
236 if not self.total:
237 return
238 self.progress += add
239 p = math.floor(self.progress * 100.0 / self.total)
240 if p > 100:
241 p = 100
242 d = round(p / self.resolution) * self.resolution
243 if d > self.lastd:
244 self.log(f'{d}%')
245 self.lastd = d
247 def log(self, s):
248 info(f'{self.title}: {s}')