Coverage for gws-app/gws/lib/osx/__init__.py: 71%
204 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
1"""Utilities for os/shell scripting"""
3from typing import Optional
5import grp
6import hashlib
7import os
8import pwd
9import re
10import shlex
11import shutil
12import signal
13import subprocess
14import time
16import psutil
18import gws
21class Error(gws.Error):
22 pass
25class TimeoutError(Error):
26 pass
29_Path = str | bytes
32def getenv(key: str, default: str = None) -> Optional[str]:
33 """Returns the value for a given environment-variable.
35 Args:
36 key: An environment-variable.
37 default: The default return.
39 Returns:
40 ``default`` if no key has been found, if there is such key then the value for the environment-variable is returned.
41 """
42 return os.getenv(key, default)
45def run_nowait(cmd: str | list, **kwargs) -> subprocess.Popen:
46 """Run a process and return immediately.
48 Args:
49 cmd: A process to run.
50 kwargs:
52 Returns:
53 The output of the command.
54 """
56 args = {
57 'stdin': None,
58 'stdout': None,
59 'stderr': None,
60 'shell': False,
61 }
62 args.update(kwargs)
64 return subprocess.Popen(cmd, **args)
67def run(cmd: str | list, input: str = None, echo: bool = False, strict: bool = True, timeout: float = None, **kwargs) -> str:
68 """Run an external command.
70 Args:
71 cmd: Command to run.
72 input: Input data.
73 echo: Echo the output instead of capturing it.
74 strict: Raise an error on a non-zero exit code.
75 timeout: Timeout.
76 kwargs: Arguments to pass to ``subprocess.Popen``.
78 Returns:
79 The command output.
80 """
82 args = {
83 'stdin': subprocess.PIPE if input else None,
84 'stdout': None if echo else subprocess.PIPE,
85 'stderr': subprocess.STDOUT,
86 'shell': False,
87 }
88 args.update(kwargs)
90 if isinstance(cmd, str):
91 cmd = shlex.split(cmd)
93 gws.log.debug(f'RUN: {cmd=}')
95 try:
96 p = subprocess.Popen(cmd, **args)
97 out, _ = p.communicate(input, timeout)
98 rc = p.returncode
99 except subprocess.TimeoutExpired as exc:
100 raise TimeoutError(f'run: command timed out', repr(cmd)) from exc
101 except Exception as exc:
102 raise Error(f'run: failure', repr(cmd)) from exc
104 if rc:
105 gws.log.debug(f'RUN_FAILED: {cmd=} {rc=} {out=}')
107 if rc and strict:
108 raise Error(f'run: non-zero exit', repr(cmd))
110 return _to_str(out or '')
113def unlink(path: _Path) -> bool:
114 """Deletes a given path.
116 Args:
117 path: Filepath.
118 """
119 try:
120 if os.path.isfile(path):
121 os.unlink(path)
122 return True
123 except OSError as exc:
124 gws.log.debug(f'OSError: unlink: {exc}')
125 return False
128def rename(src: _Path, dst: _Path):
129 """Moves and renames the source path according to the given destination.
131 Args:
132 src: Path to source.
133 dst: Destination.
134 """
136 shutil.move(_to_str(src), _to_str(dst))
139def chown(path: _Path, user: int = None, group: int = None):
140 """Changes the UID or GID for a given path.
142 Args:
143 path: Filepath.
144 user: UID.
145 group: GID.
146 """
147 os.chown(path, user or gws.c.UID, group or gws.c.GID)
150def copy(src: _Path, dst: _Path, user: int = None, group: int = None):
151 """Copy a file.
153 Args:
154 src: Source path.
155 dst: Destination path.
156 user: UID.
157 group: GID.
158 """
159 shutil.copyfile(src, dst)
160 os.chown(dst, user or gws.c.UID, group or gws.c.GID)
163def mkdir(path: _Path, mode: int = 0o755, user: int = None, group: int = None):
164 """Check a (possibly nested) directory.
166 Args:
167 path: Path to a directory.
168 mode: Directory creation mode.
169 user: Directory user (defaults to gws.c.UID)
170 group: Directory group (defaults to gws.c.GID)
171 """
173 os.makedirs(path, mode, exist_ok=True)
176def rmdir(path: _Path) -> bool:
177 """Remove a directory or a directory tree.
179 Args:
180 path: Path to a directory. Can be non-empty
181 """
183 try:
184 shutil.rmtree(path)
185 return True
186 except OSError as exc:
187 gws.log.warning(f'OSError: rmdir: {exc}')
188 return False
191def file_mtime(path: _Path) -> float:
192 """Returns the time from epoch when the path was recently changed.
194 Args:
195 path: File-/directory-path.
197 Returns:
198 Time since epoch in seconds until most recent change in file.
199 """
200 try:
201 return os.stat(path).st_mtime
202 except OSError as exc:
203 gws.log.debug(f'OSError: file_mtime: {exc}')
204 return -1
207def file_age(path: _Path) -> int:
208 """Returns the amount of seconds since the path has been changed.
210 Args:
211 path: Filepath.
213 Returns:
214 Amount of seconds since most recent change in file, if the path is invalid ``-1`` is returned.
215 """
216 try:
217 return int(time.time() - os.stat(path).st_mtime)
218 except OSError as exc:
219 gws.log.debug(f'OSError: file_age: {exc}')
220 return -1
223def file_size(path: _Path) -> int:
224 """Returns the file size.
226 Args:
227 path: Filepath.
229 Returns:
230 Amount of characters in the file or ``-1`` if the path is invalid.
231 """
232 try:
233 return os.stat(path).st_size
234 except OSError as exc:
235 gws.log.debug(f'OSError: file_size: {exc}')
236 return -1
239def file_checksum(path: _Path) -> str:
240 """Returns the checksum of the file.
242 Args:
243 path: Filepath.
245 Returns:
246 Empty string if the path is invalid, otherwise the file's checksum.
247 """
248 try:
249 with open(path, 'rb') as fp:
250 return hashlib.sha256(fp.read()).hexdigest()
251 except OSError as exc:
252 gws.log.debug(f'OSError: file_checksum: {exc}')
253 return ''
256def kill_pid(pid: int, sig_name='TERM') -> bool:
257 """Kills a process.
259 Args:
260 pid: Process ID.
261 sig_name:
263 Returns:
264 ``True`` if the process with the given PID is killed or does not exist.``False `` if the process could not be killed.
265 """
266 sig = getattr(signal, sig_name, None) or getattr(signal, 'SIG' + sig_name)
267 try:
268 psutil.Process(pid).send_signal(sig)
269 return True
270 except psutil.NoSuchProcess:
271 return True
272 except psutil.Error as e:
273 gws.log.warning(f'send_signal failed, pid={pid!r}, {e}')
274 return False
277def running_pids() -> dict[int, str]:
278 """Returns the current pids and the corresponding process' name."""
279 d = {}
280 for p in psutil.process_iter():
281 d[p.pid] = p.name()
282 return d
285def process_rss_size(unit: str = 'm') -> float:
286 """Returns the Resident Set Size.
288 Args:
289 unit: ``m`` | ``k`` | ``g``
291 Returns:
292 The Resident Set Size with the given unit.
293 """
294 n = psutil.Process().memory_info().rss
295 if unit == 'k':
296 return n / 1e3
297 if unit == 'm':
298 return n / 1e6
299 if unit == 'g':
300 return n / 1e9
301 return n
304def user_info(uid=None, gid=None) -> dict:
305 """Get user and group information.
307 Args:
308 uid: Optional user ID. Defaults to the current process's user ID.
309 gid: Optional group ID. Defaults to the user's primary group ID.
311 Returns:
312 A dictionary containing user and group information
313 (a combination of struct_passwd and struct_group).
314 """
316 uid = uid or os.getuid()
317 u = pwd.getpwuid(uid)
319 r = dict(
320 pw_name=u.pw_name,
321 pw_uid=u.pw_uid,
322 pw_gid=u.pw_gid,
323 pw_dir=u.pw_dir,
324 pw_shell=u.pw_shell,
325 )
327 gid = gid or r['pw_gid']
328 g = grp.getgrgid(gid)
330 r['gr_name'] = g.gr_name
331 r['gr_gid'] = g.gr_gid
333 return r
336def find_files(dirname: _Path, pattern=None, ext=None, deep: bool = True):
337 """Finds files in a given directory.
339 Args:
340 dirname: Path to directory.
341 pattern: Pattern to match.
342 ext: extension to match.
343 deep: If true then searches through all subdirectories for files,
344 otherwise it returns the files only in the given directory.
346 Returns:
347 A generator object.
348 """
349 if not pattern and ext:
350 if isinstance(ext, (list, tuple)):
351 ext = '|'.join(ext)
352 pattern = '\\.(' + ext + ')$'
354 de: os.DirEntry
355 for de in os.scandir(dirname):
356 if de.name.startswith('.'):
357 continue
359 if de.is_dir() and deep:
360 yield from find_files(de.path, pattern)
361 continue
363 if de.is_file() and (pattern is None or re.search(pattern, de.path)):
364 yield de.path
367def find_directories(dirname: _Path, pattern=None, deep: bool = True):
368 """Finds all directories in a given directory.
370 Args:
371 dirname: Path to directory.
372 pattern: Pattern to match.
373 deep: If true then searches through all subdirectories for directories,
374 otherwise it returns the directories only in the given directory.
376 Returns:
377 A generator object.
378 """
379 de: os.DirEntry
380 for de in os.scandir(dirname):
381 if de.name.startswith('.'):
382 continue
384 if not de.is_dir():
385 continue
387 if pattern is None or re.search(pattern, de.path):
388 yield de.path
390 if deep:
391 yield from find_directories(de.path, pattern)
394def parse_path(path: _Path) -> dict[str, str]:
395 """Parse a path into a dict(path,dirname,filename,name,extension).
397 Args:
398 path: Path.
400 Returns:
401 A dict(path,dirname,filename,name,extension).
402 """
404 str_path = _to_str(path)
405 sp = os.path.split(str_path)
407 d = {
408 'dirname': sp[0],
409 'filename': sp[1],
410 'name': '',
411 'extension': '',
412 }
414 if d['filename'].startswith('.'):
415 d['name'] = d['filename']
416 else:
417 par = d['filename'].partition('.')
418 d['name'] = par[0]
419 d['extension'] = par[2]
421 return d
424def file_name(path: _Path) -> str:
425 """Returns the filename.
427 Args:
428 path: Filepath.
430 Returns:
431 The filename.
432 """
434 sp = os.path.split(_to_str(path))
435 return sp[1]
438def is_abs_path(path: _Path) -> bool:
439 return os.path.isabs(path)
442def abs_path(path: _Path, base: _Path) -> str:
443 """Absolutize a relative path with respect to a base directory or file path.
445 Args:
446 path: A path.
447 base: A path to the base.
449 Raises:
450 ``ValueError``: If base is empty
452 Returns:
453 The absolute path.
454 """
456 str_path = _to_str(path)
458 if os.path.isabs(str_path):
459 return str_path
461 if not base:
462 raise ValueError('cannot compute abspath without a base')
464 if os.path.isfile(base):
465 base = os.path.dirname(base)
467 return os.path.abspath(os.path.join(_to_str(base), str_path))
470def abs_web_path(path: str, basedir: str) -> Optional[str]:
471 """Return an absolute path in a base dir and ensure the path is correct.
473 Args:
474 path: Path to absolutize.
475 basedir: Path to base directory.
477 Returns:
478 Absolute path with respect to base directory.
479 """
481 _dir_re = r'^[A-Za-z0-9_-]+$'
482 _fil_re = r'^[A-Za-z0-9_-]+(\.[a-z0-9]+)*$'
484 gws.log.debug(f'abs_web_path: trying {path!r} in {basedir!r}')
486 dirs = []
487 for s in path.split('/'):
488 s = s.strip()
489 if s:
490 dirs.append(s)
492 fname = dirs.pop()
494 if not all(re.match(_dir_re, p) for p in dirs):
495 gws.log.warning(f'abs_web_path: invalid dirname in path={path!r}')
496 return
498 if not re.match(_fil_re, fname):
499 gws.log.warning(f'abs_web_path: invalid filename in path={path!r}')
500 return
502 p = basedir
503 if dirs:
504 p += '/' + '/'.join(dirs)
505 p += '/' + fname
507 if not os.path.isfile(p):
508 gws.log.warning(f'abs_web_path: not a file path={path!r}')
509 return
511 return p
514def rel_path(path: _Path, base: _Path) -> str:
515 """Relativize an absolute path with respect to a base directory or file path.
517 Args:
518 path: Path to relativize.
519 base: Path to base directory.
521 Returns:
522 Relativized path with respect to base directory.
524 """
526 if os.path.isfile(base):
527 base = os.path.dirname(base)
529 return os.path.relpath(_to_str(path), _to_str(base))
532def _to_str(p: _Path) -> str:
533 return p if isinstance(p, str) else bytes(p).decode('utf8')
536def _to_bytes(p: _Path) -> bytes:
537 return p if isinstance(p, bytes) else str(p).encode('utf8')