Coverage for gws-app/gws/lib/osx/__init__.py: 71%

204 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-16 22:59 +0200

1"""Utilities for os/shell scripting""" 

2 

3from typing import Optional 

4 

5import grp 

6import hashlib 

7import os 

8import pwd 

9import re 

10import shlex 

11import shutil 

12import signal 

13import subprocess 

14import time 

15 

16import psutil 

17 

18import gws 

19 

20 

21class Error(gws.Error): 

22 pass 

23 

24 

25class TimeoutError(Error): 

26 pass 

27 

28 

29_Path = str | bytes 

30 

31 

32def getenv(key: str, default: str = None) -> Optional[str]: 

33 """Returns the value for a given environment-variable. 

34 

35 Args: 

36 key: An environment-variable. 

37 default: The default return. 

38 

39 Returns: 

40 ``default`` if no key has been found, if there is such key then the value for the environment-variable is returned. 

41 """ 

42 return os.getenv(key, default) 

43 

44 

45def run_nowait(cmd: str | list, **kwargs) -> subprocess.Popen: 

46 """Run a process and return immediately. 

47 

48 Args: 

49 cmd: A process to run. 

50 kwargs: 

51 

52 Returns: 

53 The output of the command. 

54 """ 

55 

56 args = { 

57 'stdin': None, 

58 'stdout': None, 

59 'stderr': None, 

60 'shell': False, 

61 } 

62 args.update(kwargs) 

63 

64 return subprocess.Popen(cmd, **args) 

65 

66 

67def run(cmd: str | list, input: str = None, echo: bool = False, strict: bool = True, timeout: float = None, **kwargs) -> str: 

68 """Run an external command. 

69 

70 Args: 

71 cmd: Command to run. 

72 input: Input data. 

73 echo: Echo the output instead of capturing it. 

74 strict: Raise an error on a non-zero exit code. 

75 timeout: Timeout. 

76 kwargs: Arguments to pass to ``subprocess.Popen``. 

77 

78 Returns: 

79 The command output. 

80 """ 

81 

82 args = { 

83 'stdin': subprocess.PIPE if input else None, 

84 'stdout': None if echo else subprocess.PIPE, 

85 'stderr': subprocess.STDOUT, 

86 'shell': False, 

87 } 

88 args.update(kwargs) 

89 

90 if isinstance(cmd, str): 

91 cmd = shlex.split(cmd) 

92 

93 gws.log.debug(f'RUN: {cmd=}') 

94 

95 try: 

96 p = subprocess.Popen(cmd, **args) 

97 out, _ = p.communicate(input, timeout) 

98 rc = p.returncode 

99 except subprocess.TimeoutExpired as exc: 

100 raise TimeoutError(f'run: command timed out', repr(cmd)) from exc 

101 except Exception as exc: 

102 raise Error(f'run: failure', repr(cmd)) from exc 

103 

104 if rc: 

105 gws.log.debug(f'RUN_FAILED: {cmd=} {rc=} {out=}') 

106 

107 if rc and strict: 

108 raise Error(f'run: non-zero exit', repr(cmd)) 

109 

110 return _to_str(out or '') 

111 

112 

113def unlink(path: _Path) -> bool: 

114 """Deletes a given path. 

115 

116 Args: 

117 path: Filepath. 

118 """ 

119 try: 

120 if os.path.isfile(path): 

121 os.unlink(path) 

122 return True 

123 except OSError as exc: 

124 gws.log.debug(f'OSError: unlink: {exc}') 

125 return False 

126 

127 

128def rename(src: _Path, dst: _Path): 

129 """Moves and renames the source path according to the given destination. 

130 

131 Args: 

132 src: Path to source. 

133 dst: Destination. 

134 """ 

135 

136 shutil.move(_to_str(src), _to_str(dst)) 

137 

138 

139def chown(path: _Path, user: int = None, group: int = None): 

140 """Changes the UID or GID for a given path. 

141 

142 Args: 

143 path: Filepath. 

144 user: UID. 

145 group: GID. 

146 """ 

147 os.chown(path, user or gws.c.UID, group or gws.c.GID) 

148 

149 

150def copy(src: _Path, dst: _Path, user: int = None, group: int = None): 

151 """Copy a file. 

152 

153 Args: 

154 src: Source path. 

155 dst: Destination path. 

156 user: UID. 

157 group: GID. 

158 """ 

159 shutil.copyfile(src, dst) 

160 os.chown(dst, user or gws.c.UID, group or gws.c.GID) 

161 

162 

163def mkdir(path: _Path, mode: int = 0o755, user: int = None, group: int = None): 

164 """Check a (possibly nested) directory. 

165 

166 Args: 

167 path: Path to a directory. 

168 mode: Directory creation mode. 

169 user: Directory user (defaults to gws.c.UID) 

170 group: Directory group (defaults to gws.c.GID) 

171 """ 

172 

173 os.makedirs(path, mode, exist_ok=True) 

174 

175 

176def rmdir(path: _Path) -> bool: 

177 """Remove a directory or a directory tree. 

178 

179 Args: 

180 path: Path to a directory. Can be non-empty 

181 """ 

182 

183 try: 

184 shutil.rmtree(path) 

185 return True 

186 except OSError as exc: 

187 gws.log.warning(f'OSError: rmdir: {exc}') 

188 return False 

189 

190 

191def file_mtime(path: _Path) -> float: 

192 """Returns the time from epoch when the path was recently changed. 

193 

194 Args: 

195 path: File-/directory-path. 

196 

197 Returns: 

198 Time since epoch in seconds until most recent change in file. 

199 """ 

200 try: 

201 return os.stat(path).st_mtime 

202 except OSError as exc: 

203 gws.log.debug(f'OSError: file_mtime: {exc}') 

204 return -1 

205 

206 

207def file_age(path: _Path) -> int: 

208 """Returns the amount of seconds since the path has been changed. 

209 

210 Args: 

211 path: Filepath. 

212 

213 Returns: 

214 Amount of seconds since most recent change in file, if the path is invalid ``-1`` is returned. 

215 """ 

216 try: 

217 return int(time.time() - os.stat(path).st_mtime) 

218 except OSError as exc: 

219 gws.log.debug(f'OSError: file_age: {exc}') 

220 return -1 

221 

222 

223def file_size(path: _Path) -> int: 

224 """Returns the file size. 

225 

226 Args: 

227 path: Filepath. 

228 

229 Returns: 

230 Amount of characters in the file or ``-1`` if the path is invalid. 

231 """ 

232 try: 

233 return os.stat(path).st_size 

234 except OSError as exc: 

235 gws.log.debug(f'OSError: file_size: {exc}') 

236 return -1 

237 

238 

239def file_checksum(path: _Path) -> str: 

240 """Returns the checksum of the file. 

241 

242 Args: 

243 path: Filepath. 

244 

245 Returns: 

246 Empty string if the path is invalid, otherwise the file's checksum. 

247 """ 

248 try: 

249 with open(path, 'rb') as fp: 

250 return hashlib.sha256(fp.read()).hexdigest() 

251 except OSError as exc: 

252 gws.log.debug(f'OSError: file_checksum: {exc}') 

253 return '' 

254 

255 

256def kill_pid(pid: int, sig_name='TERM') -> bool: 

257 """Kills a process. 

258 

259 Args: 

260 pid: Process ID. 

261 sig_name: 

262 

263 Returns: 

264 ``True`` if the process with the given PID is killed or does not exist.``False `` if the process could not be killed. 

265 """ 

266 sig = getattr(signal, sig_name, None) or getattr(signal, 'SIG' + sig_name) 

267 try: 

268 psutil.Process(pid).send_signal(sig) 

269 return True 

270 except psutil.NoSuchProcess: 

271 return True 

272 except psutil.Error as e: 

273 gws.log.warning(f'send_signal failed, pid={pid!r}, {e}') 

274 return False 

275 

276 

277def running_pids() -> dict[int, str]: 

278 """Returns the current pids and the corresponding process' name.""" 

279 d = {} 

280 for p in psutil.process_iter(): 

281 d[p.pid] = p.name() 

282 return d 

283 

284 

285def process_rss_size(unit: str = 'm') -> float: 

286 """Returns the Resident Set Size. 

287 

288 Args: 

289 unit: ``m`` | ``k`` | ``g`` 

290 

291 Returns: 

292 The Resident Set Size with the given unit. 

293 """ 

294 n = psutil.Process().memory_info().rss 

295 if unit == 'k': 

296 return n / 1e3 

297 if unit == 'm': 

298 return n / 1e6 

299 if unit == 'g': 

300 return n / 1e9 

301 return n 

302 

303 

304def user_info(uid=None, gid=None) -> dict: 

305 """Get user and group information. 

306 

307 Args: 

308 uid: Optional user ID. Defaults to the current process's user ID. 

309 gid: Optional group ID. Defaults to the user's primary group ID. 

310 

311 Returns: 

312 A dictionary containing user and group information 

313 (a combination of struct_passwd and struct_group). 

314 """ 

315 

316 uid = uid or os.getuid() 

317 u = pwd.getpwuid(uid) 

318 

319 r = dict( 

320 pw_name=u.pw_name, 

321 pw_uid=u.pw_uid, 

322 pw_gid=u.pw_gid, 

323 pw_dir=u.pw_dir, 

324 pw_shell=u.pw_shell, 

325 ) 

326 

327 gid = gid or r['pw_gid'] 

328 g = grp.getgrgid(gid) 

329 

330 r['gr_name'] = g.gr_name 

331 r['gr_gid'] = g.gr_gid 

332 

333 return r 

334 

335 

336def find_files(dirname: _Path, pattern=None, ext=None, deep: bool = True): 

337 """Finds files in a given directory. 

338 

339 Args: 

340 dirname: Path to directory. 

341 pattern: Pattern to match. 

342 ext: extension to match. 

343 deep: If true then searches through all subdirectories for files, 

344 otherwise it returns the files only in the given directory. 

345 

346 Returns: 

347 A generator object. 

348 """ 

349 if not pattern and ext: 

350 if isinstance(ext, (list, tuple)): 

351 ext = '|'.join(ext) 

352 pattern = '\\.(' + ext + ')$' 

353 

354 de: os.DirEntry 

355 for de in os.scandir(dirname): 

356 if de.name.startswith('.'): 

357 continue 

358 

359 if de.is_dir() and deep: 

360 yield from find_files(de.path, pattern) 

361 continue 

362 

363 if de.is_file() and (pattern is None or re.search(pattern, de.path)): 

364 yield de.path 

365 

366 

367def find_directories(dirname: _Path, pattern=None, deep: bool = True): 

368 """Finds all directories in a given directory. 

369 

370 Args: 

371 dirname: Path to directory. 

372 pattern: Pattern to match. 

373 deep: If true then searches through all subdirectories for directories, 

374 otherwise it returns the directories only in the given directory. 

375 

376 Returns: 

377 A generator object. 

378 """ 

379 de: os.DirEntry 

380 for de in os.scandir(dirname): 

381 if de.name.startswith('.'): 

382 continue 

383 

384 if not de.is_dir(): 

385 continue 

386 

387 if pattern is None or re.search(pattern, de.path): 

388 yield de.path 

389 

390 if deep: 

391 yield from find_directories(de.path, pattern) 

392 

393 

394def parse_path(path: _Path) -> dict[str, str]: 

395 """Parse a path into a dict(path,dirname,filename,name,extension). 

396 

397 Args: 

398 path: Path. 

399 

400 Returns: 

401 A dict(path,dirname,filename,name,extension). 

402 """ 

403 

404 str_path = _to_str(path) 

405 sp = os.path.split(str_path) 

406 

407 d = { 

408 'dirname': sp[0], 

409 'filename': sp[1], 

410 'name': '', 

411 'extension': '', 

412 } 

413 

414 if d['filename'].startswith('.'): 

415 d['name'] = d['filename'] 

416 else: 

417 par = d['filename'].partition('.') 

418 d['name'] = par[0] 

419 d['extension'] = par[2] 

420 

421 return d 

422 

423 

424def file_name(path: _Path) -> str: 

425 """Returns the filename. 

426 

427 Args: 

428 path: Filepath. 

429 

430 Returns: 

431 The filename. 

432 """ 

433 

434 sp = os.path.split(_to_str(path)) 

435 return sp[1] 

436 

437 

438def is_abs_path(path: _Path) -> bool: 

439 return os.path.isabs(path) 

440 

441 

442def abs_path(path: _Path, base: _Path) -> str: 

443 """Absolutize a relative path with respect to a base directory or file path. 

444 

445 Args: 

446 path: A path. 

447 base: A path to the base. 

448 

449 Raises: 

450 ``ValueError``: If base is empty 

451 

452 Returns: 

453 The absolute path. 

454 """ 

455 

456 str_path = _to_str(path) 

457 

458 if os.path.isabs(str_path): 

459 return str_path 

460 

461 if not base: 

462 raise ValueError('cannot compute abspath without a base') 

463 

464 if os.path.isfile(base): 

465 base = os.path.dirname(base) 

466 

467 return os.path.abspath(os.path.join(_to_str(base), str_path)) 

468 

469 

470def abs_web_path(path: str, basedir: str) -> Optional[str]: 

471 """Return an absolute path in a base dir and ensure the path is correct. 

472 

473 Args: 

474 path: Path to absolutize. 

475 basedir: Path to base directory. 

476 

477 Returns: 

478 Absolute path with respect to base directory. 

479 """ 

480 

481 _dir_re = r'^[A-Za-z0-9_-]+$' 

482 _fil_re = r'^[A-Za-z0-9_-]+(\.[a-z0-9]+)*$' 

483 

484 gws.log.debug(f'abs_web_path: trying {path!r} in {basedir!r}') 

485 

486 dirs = [] 

487 for s in path.split('/'): 

488 s = s.strip() 

489 if s: 

490 dirs.append(s) 

491 

492 fname = dirs.pop() 

493 

494 if not all(re.match(_dir_re, p) for p in dirs): 

495 gws.log.warning(f'abs_web_path: invalid dirname in path={path!r}') 

496 return 

497 

498 if not re.match(_fil_re, fname): 

499 gws.log.warning(f'abs_web_path: invalid filename in path={path!r}') 

500 return 

501 

502 p = basedir 

503 if dirs: 

504 p += '/' + '/'.join(dirs) 

505 p += '/' + fname 

506 

507 if not os.path.isfile(p): 

508 gws.log.warning(f'abs_web_path: not a file path={path!r}') 

509 return 

510 

511 return p 

512 

513 

514def rel_path(path: _Path, base: _Path) -> str: 

515 """Relativize an absolute path with respect to a base directory or file path. 

516 

517 Args: 

518 path: Path to relativize. 

519 base: Path to base directory. 

520 

521 Returns: 

522 Relativized path with respect to base directory. 

523 

524 """ 

525 

526 if os.path.isfile(base): 

527 base = os.path.dirname(base) 

528 

529 return os.path.relpath(_to_str(path), _to_str(base)) 

530 

531 

532def _to_str(p: _Path) -> str: 

533 return p if isinstance(p, str) else bytes(p).decode('utf8') 

534 

535 

536def _to_bytes(p: _Path) -> bytes: 

537 return p if isinstance(p, bytes) else str(p).encode('utf8')