Coverage for gws-app / gws / lib / osx / __init__.py: 71%

213 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-03 10:12 +0100

1"""Utilities for os/shell scripting""" 

2 

3from typing import Optional 

4 

5import grp 

6import hashlib 

7import os 

8import pwd 

9import re 

10import shlex 

11import shutil 

12import signal 

13import subprocess 

14import time 

15 

16import psutil 

17 

18import gws 

19 

20 

21class Error(gws.Error): 

22 pass 

23 

24 

25class TimeoutError(Error): 

26 pass 

27 

28 

29_Path = str | bytes 

30 

31 

32def getenv(key: str, default: str = None) -> Optional[str]: 

33 """Returns the value for a given environment-variable. 

34 

35 Args: 

36 key: An environment-variable. 

37 default: The default return. 

38 

39 Returns: 

40 ``default`` if no key has been found, if there is such key then the value for the environment-variable is returned. 

41 """ 

42 return os.getenv(key, default) 

43 

44 

45def run_nowait(cmd: str | list, **kwargs) -> subprocess.Popen: 

46 """Run a process and return immediately. 

47 

48 Args: 

49 cmd: A process to run. 

50 kwargs: 

51 

52 Returns: 

53 The output of the command. 

54 """ 

55 

56 args = { 

57 'stdin': None, 

58 'stdout': None, 

59 'stderr': None, 

60 'shell': False, 

61 } 

62 args.update(kwargs) 

63 

64 return subprocess.Popen(cmd, **args) 

65 

66 

67def run(cmd: str | list, input: str = None, echo: bool = False, strict: bool = True, timeout: float = None, **kwargs) -> str: 

68 """Run an external command. 

69 

70 Args: 

71 cmd: Command to run. 

72 input: Input data. 

73 echo: Echo the output instead of capturing it. 

74 strict: Raise an error on a non-zero exit code. 

75 timeout: Timeout. 

76 kwargs: Arguments to pass to ``subprocess.Popen``. 

77 

78 Returns: 

79 The command output. 

80 """ 

81 

82 args = { 

83 'stdin': subprocess.PIPE if input else None, 

84 'stdout': None if echo else subprocess.PIPE, 

85 'stderr': subprocess.STDOUT, 

86 'shell': False, 

87 } 

88 args.update(kwargs) 

89 

90 if isinstance(cmd, str): 

91 cmd = shlex.split(cmd) 

92 

93 gws.log.debug(f'RUN: {cmd=}') 

94 

95 try: 

96 p = subprocess.Popen(cmd, **args) 

97 out, _ = p.communicate(input, timeout) 

98 rc = p.returncode 

99 except subprocess.TimeoutExpired as exc: 

100 raise TimeoutError(f'run: command timed out', repr(cmd)) from exc 

101 except Exception as exc: 

102 raise Error(f'run: failure', repr(cmd)) from exc 

103 

104 if rc: 

105 gws.log.debug(f'RUN_FAILED: {cmd=} {rc=} {out=}') 

106 

107 if rc and strict: 

108 raise Error(f'run: non-zero exit', repr(cmd)) 

109 

110 return _to_str(out or '') 

111 

112 

113def unlink(path: _Path) -> bool: 

114 """Deletes a given path. 

115 

116 Args: 

117 path: Filepath. 

118 """ 

119 try: 

120 if os.path.isfile(path): 

121 os.unlink(path) 

122 return True 

123 except OSError as exc: 

124 gws.log.debug(f'OSError: unlink: {exc}') 

125 return False 

126 

127 

128def rename(src: _Path, dst: _Path): 

129 """Moves and renames the source path according to the given destination. 

130 

131 Args: 

132 src: Path to source. 

133 dst: Destination. 

134 """ 

135 

136 shutil.move(_to_str(src), _to_str(dst)) 

137 

138 

139def chown(path: _Path, user: int = None, group: int = None): 

140 """Changes the UID or GID for a given path. 

141 

142 Args: 

143 path: Filepath. 

144 user: UID. 

145 group: GID. 

146 """ 

147 os.chown(path, user or gws.c.UID, group or gws.c.GID) 

148 

149 

150def copy(src: _Path, dst: _Path, user: int = None, group: int = None): 

151 """Copy a file. 

152 

153 Args: 

154 src: Source path. 

155 dst: Destination path. 

156 user: UID. 

157 group: GID. 

158 """ 

159 shutil.copyfile(src, dst) 

160 os.chown(dst, user or gws.c.UID, group or gws.c.GID) 

161 

162 

163def mkdir(path: _Path, mode: int = 0o755, user: int = None, group: int = None): 

164 """Check a (possibly nested) directory. 

165 

166 Args: 

167 path: Path to a directory. 

168 mode: Directory creation mode. 

169 user: Directory user (defaults to gws.c.UID) 

170 group: Directory group (defaults to gws.c.GID) 

171 """ 

172 

173 os.makedirs(path, mode, exist_ok=True) 

174 

175 

176def rmdir(path: _Path) -> bool: 

177 """Remove a directory or a directory tree. 

178 

179 Args: 

180 path: Path to a directory. Can be non-empty 

181 """ 

182 

183 try: 

184 shutil.rmtree(path) 

185 return True 

186 except OSError as exc: 

187 gws.log.warning(f'OSError: rmdir: {exc}') 

188 return False 

189 

190 

191def touch(path: _Path): 

192 """Update the access and modification times of the file to the current time. 

193 If the file does not exist, it is created. 

194 

195 Args: 

196 path: Filepath. 

197 """ 

198 with open(path, 'a'): 

199 os.utime(path, None) 

200 

201 

202def file_mtime(path: _Path) -> float: 

203 """Returns the time from epoch when the path was recently changed. 

204 

205 Args: 

206 path: File-/directory-path. 

207 

208 Returns: 

209 Time since epoch in seconds until most recent change in file. 

210 """ 

211 try: 

212 return os.stat(path).st_mtime 

213 except OSError as exc: 

214 gws.log.debug(f'OSError: file_mtime: {exc}') 

215 return -1 

216 

217 

218def file_age(path: _Path) -> int: 

219 """Returns the amount of seconds since the path has been changed. 

220 

221 Args: 

222 path: Filepath. 

223 

224 Returns: 

225 Amount of seconds since most recent change in file, if the path is invalid ``-1`` is returned. 

226 """ 

227 try: 

228 return int(time.time() - os.stat(path).st_mtime) 

229 except OSError as exc: 

230 gws.log.debug(f'OSError: file_age: {exc}') 

231 return -1 

232 

233 

234def file_size(path: _Path) -> int: 

235 """Returns the file size. 

236 

237 Args: 

238 path: Filepath. 

239 

240 Returns: 

241 Amount of characters in the file or ``-1`` if the path is invalid. 

242 """ 

243 try: 

244 return os.stat(path).st_size 

245 except OSError as exc: 

246 gws.log.debug(f'OSError: file_size: {exc}') 

247 return -1 

248 

249 

250def file_checksum(path: _Path) -> str: 

251 """Returns the checksum of the file. 

252 

253 Args: 

254 path: Filepath. 

255 

256 Returns: 

257 Empty string if the path is invalid, otherwise the file's checksum. 

258 """ 

259 try: 

260 with open(path, 'rb') as fp: 

261 return hashlib.sha256(fp.read()).hexdigest() 

262 except OSError as exc: 

263 gws.log.debug(f'OSError: file_checksum: {exc}') 

264 return '' 

265 

266 

267def kill_pid(pid: int, sig_name='TERM') -> bool: 

268 """Kills a process. 

269 

270 Args: 

271 pid: Process ID. 

272 sig_name: 

273 

274 Returns: 

275 ``True`` if the process with the given PID is killed or does not exist.``False `` if the process could not be killed. 

276 """ 

277 sig = getattr(signal, sig_name, None) or getattr(signal, 'SIG' + sig_name) 

278 try: 

279 psutil.Process(pid).send_signal(sig) 

280 return True 

281 except psutil.NoSuchProcess: 

282 return True 

283 except psutil.Error as e: 

284 gws.log.warning(f'send_signal failed, pid={pid!r}, {e}') 

285 return False 

286 

287 

288def running_pids() -> dict[int, str]: 

289 """Returns the current pids and the corresponding process' name.""" 

290 d = {} 

291 for p in psutil.process_iter(): 

292 d[p.pid] = p.name() 

293 return d 

294 

295 

296def process_rss_size(unit: str = 'm') -> float: 

297 """Returns the Resident Set Size. 

298 

299 Args: 

300 unit: ``m`` | ``k`` | ``g`` 

301 

302 Returns: 

303 The Resident Set Size with the given unit. 

304 """ 

305 n = psutil.Process().memory_info().rss 

306 if unit == 'k': 

307 return n / 1e3 

308 if unit == 'm': 

309 return n / 1e6 

310 if unit == 'g': 

311 return n / 1e9 

312 return n 

313 

314 

315def user_info(uid=None, gid=None) -> dict: 

316 """Get user and group information. 

317 

318 Args: 

319 uid: Optional user ID. Defaults to the current process's user ID. 

320 gid: Optional group ID. Defaults to the user's primary group ID. 

321 

322 Returns: 

323 A dictionary containing user and group information 

324 (a combination of struct_passwd and struct_group). 

325 """ 

326 

327 uid = uid or os.getuid() 

328 u = pwd.getpwuid(uid) 

329 

330 r = dict( 

331 pw_name=u.pw_name, 

332 pw_uid=u.pw_uid, 

333 pw_gid=u.pw_gid, 

334 pw_dir=u.pw_dir, 

335 pw_shell=u.pw_shell, 

336 ) 

337 

338 gid = gid or r['pw_gid'] 

339 g = grp.getgrgid(gid) 

340 

341 r['gr_name'] = g.gr_name 

342 r['gr_gid'] = g.gr_gid 

343 

344 return r 

345 

346 

347def find_files(dirname: _Path, pattern=None, ext=None, deep: bool = True): 

348 """Finds files in a given directory. 

349 

350 Args: 

351 dirname: Path to directory. 

352 pattern: Pattern to match. 

353 ext: extension to match. 

354 deep: If true then searches through all subdirectories for files, 

355 otherwise it returns the files only in the given directory. 

356 

357 Returns: 

358 A generator object. 

359 """ 

360 if not pattern and ext: 

361 if isinstance(ext, (list, tuple)): 

362 ext = '|'.join(ext) 

363 pattern = '\\.(' + ext + ')$' 

364 

365 de: os.DirEntry 

366 for de in os.scandir(dirname): 

367 if de.name.startswith('.'): 

368 continue 

369 

370 if de.is_dir() and deep: 

371 yield from find_files(de.path, pattern) 

372 continue 

373 

374 if de.is_file() and (pattern is None or re.search(pattern, de.path)): 

375 yield de.path 

376 

377 

378def find_directories(dirname: _Path, pattern=None, deep: bool = True): 

379 """Finds all directories in a given directory. 

380 

381 Args: 

382 dirname: Path to directory. 

383 pattern: Pattern to match. 

384 deep: If true then searches through all subdirectories for directories, 

385 otherwise it returns the directories only in the given directory. 

386 

387 Returns: 

388 A generator object. 

389 """ 

390 de: os.DirEntry 

391 for de in os.scandir(dirname): 

392 if de.name.startswith('.'): 

393 continue 

394 

395 if not de.is_dir(): 

396 continue 

397 

398 if pattern is None or re.search(pattern, de.path): 

399 yield de.path 

400 

401 if deep: 

402 yield from find_directories(de.path, pattern) 

403 

404 

405class ParsePathResult(gws.Data): 

406 path: str 

407 dirname: str 

408 filename: str 

409 stem: str 

410 extension: str 

411 

412 

413def parse_path(path: _Path) -> ParsePathResult: 

414 """Parse a file path into a ParsePathResult object.""" 

415 

416 str_path = _to_str(path) 

417 sp = os.path.split(str_path) 

418 

419 pp = ParsePathResult( 

420 path=str_path, 

421 dirname='', 

422 filename='', 

423 stem='', 

424 extension='', 

425 ) 

426 pp.dirname = sp[0] 

427 pp.filename = sp[1] 

428 

429 if pp.filename.startswith('.'): 

430 pp.stem = pp.filename 

431 else: 

432 pp.stem, _, pp.extension = pp.filename.partition('.') 

433 

434 return pp 

435 

436 

437def file_name(path: _Path) -> str: 

438 """Returns the filename. 

439 

440 Args: 

441 path: Filepath. 

442 

443 Returns: 

444 The filename. 

445 """ 

446 

447 sp = os.path.split(_to_str(path)) 

448 return sp[1] 

449 

450 

451def is_abs_path(path: _Path) -> bool: 

452 return os.path.isabs(path) 

453 

454 

455def abs_path(path: _Path, base: _Path) -> str: 

456 """Absolutize a relative path with respect to a base directory or file path. 

457 

458 Args: 

459 path: A path. 

460 base: A path to the base. 

461 

462 Raises: 

463 ``ValueError``: If base is empty 

464 

465 Returns: 

466 The absolute path. 

467 """ 

468 

469 str_path = _to_str(path) 

470 

471 if os.path.isabs(str_path): 

472 return str_path 

473 

474 if not base: 

475 raise ValueError('cannot compute abspath without a base') 

476 

477 if os.path.isfile(base): 

478 base = os.path.dirname(base) 

479 

480 return os.path.abspath(os.path.join(_to_str(base), str_path)) 

481 

482 

483def abs_web_path(path: str, basedir: str) -> Optional[str]: 

484 """Return an absolute path in a base dir and ensure the path is correct. 

485 

486 Args: 

487 path: Path to absolutize. 

488 basedir: Path to base directory. 

489 

490 Returns: 

491 Absolute path with respect to base directory. 

492 """ 

493 

494 _dir_re = r'^[A-Za-z0-9_-]+$' 

495 _fil_re = r'^[A-Za-z0-9_-]+(\.[a-z0-9]+)*$' 

496 

497 gws.log.debug(f'abs_web_path: trying {path!r} in {basedir!r}') 

498 

499 dirs = [] 

500 for s in path.split('/'): 

501 s = s.strip() 

502 if s: 

503 dirs.append(s) 

504 

505 fname = dirs.pop() 

506 

507 if not all(re.match(_dir_re, p) for p in dirs): 

508 gws.log.warning(f'abs_web_path: invalid dirname in path={path!r}') 

509 return 

510 

511 if not re.match(_fil_re, fname): 

512 gws.log.warning(f'abs_web_path: invalid filename in path={path!r}') 

513 return 

514 

515 p = basedir 

516 if dirs: 

517 p += '/' + '/'.join(dirs) 

518 p += '/' + fname 

519 

520 if not os.path.isfile(p): 

521 gws.log.warning(f'abs_web_path: not a file path={path!r}') 

522 return 

523 

524 return p 

525 

526 

527def rel_path(path: _Path, base: _Path) -> str: 

528 """Relativize an absolute path with respect to a base directory or file path. 

529 

530 Args: 

531 path: Path to relativize. 

532 base: Path to base directory. 

533 

534 Returns: 

535 Relativized path with respect to base directory. 

536 

537 """ 

538 

539 if os.path.isfile(base): 

540 base = os.path.dirname(base) 

541 

542 return os.path.relpath(_to_str(path), _to_str(base)) 

543 

544 

545def _to_str(p: _Path) -> str: 

546 return p if isinstance(p, str) else bytes(p).decode('utf8') 

547 

548 

549def _to_bytes(p: _Path) -> bytes: 

550 return p if isinstance(p, bytes) else str(p).encode('utf8')