Coverage for gws-app / gws / lib / osx / __init__.py: 71%
213 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-03 10:12 +0100
1"""Utilities for os/shell scripting"""
3from typing import Optional
5import grp
6import hashlib
7import os
8import pwd
9import re
10import shlex
11import shutil
12import signal
13import subprocess
14import time
16import psutil
18import gws
21class Error(gws.Error):
22 pass
25class TimeoutError(Error):
26 pass
29_Path = str | bytes
32def getenv(key: str, default: str = None) -> Optional[str]:
33 """Returns the value for a given environment-variable.
35 Args:
36 key: An environment-variable.
37 default: The default return.
39 Returns:
40 ``default`` if no key has been found, if there is such key then the value for the environment-variable is returned.
41 """
42 return os.getenv(key, default)
45def run_nowait(cmd: str | list, **kwargs) -> subprocess.Popen:
46 """Run a process and return immediately.
48 Args:
49 cmd: A process to run.
50 kwargs:
52 Returns:
53 The output of the command.
54 """
56 args = {
57 'stdin': None,
58 'stdout': None,
59 'stderr': None,
60 'shell': False,
61 }
62 args.update(kwargs)
64 return subprocess.Popen(cmd, **args)
67def run(cmd: str | list, input: str = None, echo: bool = False, strict: bool = True, timeout: float = None, **kwargs) -> str:
68 """Run an external command.
70 Args:
71 cmd: Command to run.
72 input: Input data.
73 echo: Echo the output instead of capturing it.
74 strict: Raise an error on a non-zero exit code.
75 timeout: Timeout.
76 kwargs: Arguments to pass to ``subprocess.Popen``.
78 Returns:
79 The command output.
80 """
82 args = {
83 'stdin': subprocess.PIPE if input else None,
84 'stdout': None if echo else subprocess.PIPE,
85 'stderr': subprocess.STDOUT,
86 'shell': False,
87 }
88 args.update(kwargs)
90 if isinstance(cmd, str):
91 cmd = shlex.split(cmd)
93 gws.log.debug(f'RUN: {cmd=}')
95 try:
96 p = subprocess.Popen(cmd, **args)
97 out, _ = p.communicate(input, timeout)
98 rc = p.returncode
99 except subprocess.TimeoutExpired as exc:
100 raise TimeoutError(f'run: command timed out', repr(cmd)) from exc
101 except Exception as exc:
102 raise Error(f'run: failure', repr(cmd)) from exc
104 if rc:
105 gws.log.debug(f'RUN_FAILED: {cmd=} {rc=} {out=}')
107 if rc and strict:
108 raise Error(f'run: non-zero exit', repr(cmd))
110 return _to_str(out or '')
113def unlink(path: _Path) -> bool:
114 """Deletes a given path.
116 Args:
117 path: Filepath.
118 """
119 try:
120 if os.path.isfile(path):
121 os.unlink(path)
122 return True
123 except OSError as exc:
124 gws.log.debug(f'OSError: unlink: {exc}')
125 return False
128def rename(src: _Path, dst: _Path):
129 """Moves and renames the source path according to the given destination.
131 Args:
132 src: Path to source.
133 dst: Destination.
134 """
136 shutil.move(_to_str(src), _to_str(dst))
139def chown(path: _Path, user: int = None, group: int = None):
140 """Changes the UID or GID for a given path.
142 Args:
143 path: Filepath.
144 user: UID.
145 group: GID.
146 """
147 os.chown(path, user or gws.c.UID, group or gws.c.GID)
150def copy(src: _Path, dst: _Path, user: int = None, group: int = None):
151 """Copy a file.
153 Args:
154 src: Source path.
155 dst: Destination path.
156 user: UID.
157 group: GID.
158 """
159 shutil.copyfile(src, dst)
160 os.chown(dst, user or gws.c.UID, group or gws.c.GID)
163def mkdir(path: _Path, mode: int = 0o755, user: int = None, group: int = None):
164 """Check a (possibly nested) directory.
166 Args:
167 path: Path to a directory.
168 mode: Directory creation mode.
169 user: Directory user (defaults to gws.c.UID)
170 group: Directory group (defaults to gws.c.GID)
171 """
173 os.makedirs(path, mode, exist_ok=True)
176def rmdir(path: _Path) -> bool:
177 """Remove a directory or a directory tree.
179 Args:
180 path: Path to a directory. Can be non-empty
181 """
183 try:
184 shutil.rmtree(path)
185 return True
186 except OSError as exc:
187 gws.log.warning(f'OSError: rmdir: {exc}')
188 return False
191def touch(path: _Path):
192 """Update the access and modification times of the file to the current time.
193 If the file does not exist, it is created.
195 Args:
196 path: Filepath.
197 """
198 with open(path, 'a'):
199 os.utime(path, None)
202def file_mtime(path: _Path) -> float:
203 """Returns the time from epoch when the path was recently changed.
205 Args:
206 path: File-/directory-path.
208 Returns:
209 Time since epoch in seconds until most recent change in file.
210 """
211 try:
212 return os.stat(path).st_mtime
213 except OSError as exc:
214 gws.log.debug(f'OSError: file_mtime: {exc}')
215 return -1
218def file_age(path: _Path) -> int:
219 """Returns the amount of seconds since the path has been changed.
221 Args:
222 path: Filepath.
224 Returns:
225 Amount of seconds since most recent change in file, if the path is invalid ``-1`` is returned.
226 """
227 try:
228 return int(time.time() - os.stat(path).st_mtime)
229 except OSError as exc:
230 gws.log.debug(f'OSError: file_age: {exc}')
231 return -1
234def file_size(path: _Path) -> int:
235 """Returns the file size.
237 Args:
238 path: Filepath.
240 Returns:
241 Amount of characters in the file or ``-1`` if the path is invalid.
242 """
243 try:
244 return os.stat(path).st_size
245 except OSError as exc:
246 gws.log.debug(f'OSError: file_size: {exc}')
247 return -1
250def file_checksum(path: _Path) -> str:
251 """Returns the checksum of the file.
253 Args:
254 path: Filepath.
256 Returns:
257 Empty string if the path is invalid, otherwise the file's checksum.
258 """
259 try:
260 with open(path, 'rb') as fp:
261 return hashlib.sha256(fp.read()).hexdigest()
262 except OSError as exc:
263 gws.log.debug(f'OSError: file_checksum: {exc}')
264 return ''
267def kill_pid(pid: int, sig_name='TERM') -> bool:
268 """Kills a process.
270 Args:
271 pid: Process ID.
272 sig_name:
274 Returns:
275 ``True`` if the process with the given PID is killed or does not exist.``False `` if the process could not be killed.
276 """
277 sig = getattr(signal, sig_name, None) or getattr(signal, 'SIG' + sig_name)
278 try:
279 psutil.Process(pid).send_signal(sig)
280 return True
281 except psutil.NoSuchProcess:
282 return True
283 except psutil.Error as e:
284 gws.log.warning(f'send_signal failed, pid={pid!r}, {e}')
285 return False
288def running_pids() -> dict[int, str]:
289 """Returns the current pids and the corresponding process' name."""
290 d = {}
291 for p in psutil.process_iter():
292 d[p.pid] = p.name()
293 return d
296def process_rss_size(unit: str = 'm') -> float:
297 """Returns the Resident Set Size.
299 Args:
300 unit: ``m`` | ``k`` | ``g``
302 Returns:
303 The Resident Set Size with the given unit.
304 """
305 n = psutil.Process().memory_info().rss
306 if unit == 'k':
307 return n / 1e3
308 if unit == 'm':
309 return n / 1e6
310 if unit == 'g':
311 return n / 1e9
312 return n
315def user_info(uid=None, gid=None) -> dict:
316 """Get user and group information.
318 Args:
319 uid: Optional user ID. Defaults to the current process's user ID.
320 gid: Optional group ID. Defaults to the user's primary group ID.
322 Returns:
323 A dictionary containing user and group information
324 (a combination of struct_passwd and struct_group).
325 """
327 uid = uid or os.getuid()
328 u = pwd.getpwuid(uid)
330 r = dict(
331 pw_name=u.pw_name,
332 pw_uid=u.pw_uid,
333 pw_gid=u.pw_gid,
334 pw_dir=u.pw_dir,
335 pw_shell=u.pw_shell,
336 )
338 gid = gid or r['pw_gid']
339 g = grp.getgrgid(gid)
341 r['gr_name'] = g.gr_name
342 r['gr_gid'] = g.gr_gid
344 return r
347def find_files(dirname: _Path, pattern=None, ext=None, deep: bool = True):
348 """Finds files in a given directory.
350 Args:
351 dirname: Path to directory.
352 pattern: Pattern to match.
353 ext: extension to match.
354 deep: If true then searches through all subdirectories for files,
355 otherwise it returns the files only in the given directory.
357 Returns:
358 A generator object.
359 """
360 if not pattern and ext:
361 if isinstance(ext, (list, tuple)):
362 ext = '|'.join(ext)
363 pattern = '\\.(' + ext + ')$'
365 de: os.DirEntry
366 for de in os.scandir(dirname):
367 if de.name.startswith('.'):
368 continue
370 if de.is_dir() and deep:
371 yield from find_files(de.path, pattern)
372 continue
374 if de.is_file() and (pattern is None or re.search(pattern, de.path)):
375 yield de.path
378def find_directories(dirname: _Path, pattern=None, deep: bool = True):
379 """Finds all directories in a given directory.
381 Args:
382 dirname: Path to directory.
383 pattern: Pattern to match.
384 deep: If true then searches through all subdirectories for directories,
385 otherwise it returns the directories only in the given directory.
387 Returns:
388 A generator object.
389 """
390 de: os.DirEntry
391 for de in os.scandir(dirname):
392 if de.name.startswith('.'):
393 continue
395 if not de.is_dir():
396 continue
398 if pattern is None or re.search(pattern, de.path):
399 yield de.path
401 if deep:
402 yield from find_directories(de.path, pattern)
405class ParsePathResult(gws.Data):
406 path: str
407 dirname: str
408 filename: str
409 stem: str
410 extension: str
413def parse_path(path: _Path) -> ParsePathResult:
414 """Parse a file path into a ParsePathResult object."""
416 str_path = _to_str(path)
417 sp = os.path.split(str_path)
419 pp = ParsePathResult(
420 path=str_path,
421 dirname='',
422 filename='',
423 stem='',
424 extension='',
425 )
426 pp.dirname = sp[0]
427 pp.filename = sp[1]
429 if pp.filename.startswith('.'):
430 pp.stem = pp.filename
431 else:
432 pp.stem, _, pp.extension = pp.filename.partition('.')
434 return pp
437def file_name(path: _Path) -> str:
438 """Returns the filename.
440 Args:
441 path: Filepath.
443 Returns:
444 The filename.
445 """
447 sp = os.path.split(_to_str(path))
448 return sp[1]
451def is_abs_path(path: _Path) -> bool:
452 return os.path.isabs(path)
455def abs_path(path: _Path, base: _Path) -> str:
456 """Absolutize a relative path with respect to a base directory or file path.
458 Args:
459 path: A path.
460 base: A path to the base.
462 Raises:
463 ``ValueError``: If base is empty
465 Returns:
466 The absolute path.
467 """
469 str_path = _to_str(path)
471 if os.path.isabs(str_path):
472 return str_path
474 if not base:
475 raise ValueError('cannot compute abspath without a base')
477 if os.path.isfile(base):
478 base = os.path.dirname(base)
480 return os.path.abspath(os.path.join(_to_str(base), str_path))
483def abs_web_path(path: str, basedir: str) -> Optional[str]:
484 """Return an absolute path in a base dir and ensure the path is correct.
486 Args:
487 path: Path to absolutize.
488 basedir: Path to base directory.
490 Returns:
491 Absolute path with respect to base directory.
492 """
494 _dir_re = r'^[A-Za-z0-9_-]+$'
495 _fil_re = r'^[A-Za-z0-9_-]+(\.[a-z0-9]+)*$'
497 gws.log.debug(f'abs_web_path: trying {path!r} in {basedir!r}')
499 dirs = []
500 for s in path.split('/'):
501 s = s.strip()
502 if s:
503 dirs.append(s)
505 fname = dirs.pop()
507 if not all(re.match(_dir_re, p) for p in dirs):
508 gws.log.warning(f'abs_web_path: invalid dirname in path={path!r}')
509 return
511 if not re.match(_fil_re, fname):
512 gws.log.warning(f'abs_web_path: invalid filename in path={path!r}')
513 return
515 p = basedir
516 if dirs:
517 p += '/' + '/'.join(dirs)
518 p += '/' + fname
520 if not os.path.isfile(p):
521 gws.log.warning(f'abs_web_path: not a file path={path!r}')
522 return
524 return p
527def rel_path(path: _Path, base: _Path) -> str:
528 """Relativize an absolute path with respect to a base directory or file path.
530 Args:
531 path: Path to relativize.
532 base: Path to base directory.
534 Returns:
535 Relativized path with respect to base directory.
537 """
539 if os.path.isfile(base):
540 base = os.path.dirname(base)
542 return os.path.relpath(_to_str(path), _to_str(base))
545def _to_str(p: _Path) -> str:
546 return p if isinstance(p, str) else bytes(p).decode('utf8')
549def _to_bytes(p: _Path) -> bytes:
550 return p if isinstance(p, bytes) else str(p).encode('utf8')