Coverage for gws-app/gws/core/util.py: 46%
591 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 22:59 +0200
1"""Core utilities
3Most common function which are needed everywhere.
5This module is available as ``gws.u`` everywhere.
6"""
8import hashlib
9import json
10import os
11import pickle
12import random
13import re
14import shutil
15import sys
16import threading
17import time
18import urllib.parse
19from typing import Optional, TypeVar, Union, cast
21from . import const, log
24def is_data_object(x) -> bool:
25 return False
28def to_data_object(x):
29 pass
32def exit(code: int = 255):
33 """Exit the application.
35 Args:
36 code: Exit code.
37 """
39 sys.exit(code)
42T = TypeVar('T')
45def require(value: Optional[T]) -> T:
46 """Return the value if not None, otherwise raise an Exception."""
47 if value is None:
48 raise ValueError('unexpected None value')
49 return value
52##
54# @TODO use ABC
57def is_list(x):
58 return isinstance(x, (list, tuple))
61def is_dict(x):
62 return isinstance(x, dict)
65def is_bytes(x):
66 return isinstance(x, (bytes, bytearray))
67 # @TODO how to handle bytes-alikes?
68 # return hasattr(x, 'decode')
71def is_atom(x):
72 return x is None or isinstance(x, (int, float, bool, str, bytes))
75def is_empty(x) -> bool:
76 """Check if the value is empty (None, empty list/dict/object)."""
78 if x is None:
79 return True
80 try:
81 return len(x) == 0
82 except TypeError:
83 pass
84 try:
85 return not vars(x)
86 except TypeError:
87 pass
88 return False
91##
94def get(x, key, default=None):
95 """Get a nested value/attribute from a structure.
97 Args:
98 x: A dict, list or Data.
99 key: A list or a dot separated string of nested keys.
100 default: The default value.
102 Returns:
103 The value if it exists and the default otherwise.
104 """
106 if not x:
107 return default
108 if isinstance(key, str):
109 key = key.split('.')
110 try:
111 return _get(x, key)
112 except (KeyError, IndexError, AttributeError, ValueError):
113 return default
116def has(x, key) -> bool:
117 """True if a nested value/attribute exists in a structure.
119 Args:
120 x: A dict, list or Data.
121 key: A list or a dot separated string of nested keys.
123 Returns:
124 True if a key exists
125 """
127 if not x:
128 return False
129 if isinstance(key, str):
130 key = key.split('.')
131 try:
132 _get(x, key)
133 return True
134 except (KeyError, IndexError, AttributeError, ValueError):
135 return False
138def _get(x, keys):
139 for k in keys:
140 if is_dict(x):
141 x = x[k]
142 elif is_list(x):
143 x = x[int(k)]
144 elif is_data_object(x):
145 # special case: raise a KeyError if the attribute is truly missing in a Data
146 # (and not just equals to None)
147 x = vars(x)[k]
148 else:
149 x = getattr(x, k)
150 return x
153def pop(x, key, default=None):
154 if is_dict(x):
155 return x.pop(key, default)
156 if is_data_object(x):
157 return vars(x).pop(key, default)
158 return default
161def pick(x, *keys):
162 def _pick(d):
163 r = {}
164 for k in keys:
165 if k in d:
166 r[k] = d[k]
167 return r
169 if is_dict(x):
170 return _pick(x)
171 if is_data_object(x):
172 return type(x)(_pick(vars(x)))
173 return {}
176def omit(x, *keys):
177 def _omit(d):
178 r = {}
179 for k, v in d.items():
180 if k not in keys:
181 r[k] = d[k]
182 return r
184 if is_dict(x):
185 return _omit(x)
186 if is_data_object(x):
187 return type(x)(_omit(vars(x)))
188 return {}
191def collect(pairs):
192 m = {}
194 for key, val in pairs:
195 if key is not None:
196 m.setdefault(key, []).append(val)
198 return m
201def first(it):
202 for x in it:
203 return x
206def first_not_none(*args):
207 for a in args:
208 if a is not None:
209 return a
212def merge(*args, **kwargs) -> Union[dict, 'Data']:
213 """Create a new dict/Data object by merging values from dicts/Datas or kwargs.
214 Latter vales overwrite former ones unless None.
216 Args:
217 *args: dicts or Datas.
218 **kwargs: Keyword args.
220 Returns:
221 A new object (dict or Data).
222 """
224 def _merge(arg):
225 for k, v in to_dict(arg).items():
226 if v is not None:
227 m[k] = v
229 m = {}
231 for a in args:
232 if a:
233 _merge(a)
234 if kwargs:
235 _merge(kwargs)
237 if not args or isinstance(args[0], dict) or args[0] is None:
238 return m
239 return type(args[0])(m)
242def deep_merge(x, y, concat_lists=True):
243 """Deeply merge dicts/Datas into a nested dict/Data.
244 Latter vales overwrite former ones unless None.
246 Args:
247 x: dict or Data.
248 y: dict or Data.
249 concat_lists: if true, list will be concatenated, otherwise merged
251 Returns:
252 A new object (dict or Data).
253 """
255 if (is_dict(x) or is_data_object(x)) and (is_dict(y) or is_data_object(y)):
256 xd = to_dict(x)
257 yd = to_dict(y)
258 d = {k: deep_merge(xd.get(k), yd.get(k), concat_lists) for k in xd.keys() | yd.keys()}
259 return d if is_dict(x) else type(x)(d)
261 if is_list(x) and is_list(y):
262 xc = compact(x)
263 yc = compact(y)
264 if concat_lists:
265 return xc + yc
266 return [deep_merge(x1, y1, concat_lists) for x1, y1 in zip(xc, yc)]
268 return y if y is not None else x
271def compact(x):
272 """Remove all None values from a collection."""
274 if is_dict(x):
275 return {k: v for k, v in x.items() if v is not None}
276 if is_data_object(x):
277 d = {k: v for k, v in vars(x).items() if v is not None}
278 return type(x)(d)
279 return [v for v in x if v is not None]
282def strip(x):
283 """Strip all strings and remove empty values from a collection."""
285 def _strip(v):
286 if isinstance(v, (str, bytes, bytearray)):
287 return v.strip()
288 return v
290 def _dict(x1):
291 d = {}
292 for k, v in x1.items():
293 v = _strip(v)
294 if not is_empty(v):
295 d[k] = v
296 return d
298 if is_dict(x):
299 return _dict(x)
300 if is_data_object(x):
301 return type(x)(_dict(vars(x)))
303 r = [_strip(v) for v in x]
304 return [v for v in r if not is_empty(v)]
307def uniq(x):
308 """Remove duplicate elements from a collection."""
310 s = set()
311 r = []
313 for y in x:
314 try:
315 if y not in s:
316 s.add(y)
317 r.append(y)
318 except TypeError:
319 if y not in r:
320 r.append(y)
322 return r
325##
328def to_int(x) -> int:
329 """Convert a value to an int or 0 if this fails."""
331 try:
332 return int(x)
333 except:
334 return 0
337def to_rounded_int(x) -> int:
338 """Round and convert a value to an int or 0 if this fails."""
340 try:
341 if isinstance(x, float):
342 return int(round(x))
343 return int(x)
344 except:
345 return 0
348def to_float(x) -> float:
349 """Convert a value to a float or 0.0 if this fails."""
351 try:
352 return float(x)
353 except:
354 return 0.0
357def to_str(x, encodings: list[str] = None) -> str:
358 """Convert a value to a string.
360 Args:
361 x: Value.
362 encodings: A list of acceptable encodings. If the value is bytes, try each encoding,
363 and return the first one which passes without errors.
365 Returns:
366 A string.
367 """
369 if isinstance(x, str):
370 return x
371 if x is None:
372 return ''
373 if not is_bytes(x):
374 return str(x)
375 if encodings:
376 for enc in encodings:
377 try:
378 return x.decode(encoding=enc, errors='strict')
379 except UnicodeDecodeError:
380 pass
381 return x.decode(encoding='utf-8', errors='ignore')
384def to_bytes(x, encoding='utf8') -> bytes:
385 """Convert a value to bytes by converting it to string and encoding."""
387 if is_bytes(x):
388 return bytes(x)
389 if x is None:
390 return b''
391 if not isinstance(x, str):
392 x = str(x)
393 return x.encode(encoding or 'utf8')
396def to_list(x, delimiter: str = ',') -> list:
397 """Convert a value to a list.
399 Args:
400 x: A value. Is it's a string, split it by the delimiter
401 delimiter:
403 Returns:
404 A list.
405 """
407 if isinstance(x, list):
408 return x
409 if is_empty(x):
410 return []
411 if is_bytes(x):
412 x = to_str(x)
413 if isinstance(x, str):
414 if delimiter:
415 ls = [s.strip() for s in x.split(delimiter)]
416 return [s for s in ls if s]
417 return [x]
418 if isinstance(x, (int, float, bool)):
419 return [x]
420 try:
421 return [s for s in x]
422 except TypeError:
423 return []
426def to_dict(x) -> dict:
427 """Convert a value to a dict. If the argument is an object, return its `dict`."""
429 if is_dict(x):
430 return x
431 if x is None:
432 return {}
433 try:
434 f = getattr(x, '_asdict', None)
435 if f:
436 return f()
437 return vars(x)
438 except TypeError:
439 raise ValueError(f'cannot convert {x!r} to dict')
442def to_upper_dict(x) -> dict:
443 x = to_dict(x)
444 return {k.upper(): v for k, v in x.items()}
447def to_lower_dict(x) -> dict:
448 x = to_dict(x)
449 return {k.lower(): v for k, v in x.items()}
452##
454_UID_DE_TRANS = {
455 ord('ä'): 'ae',
456 ord('ö'): 'oe',
457 ord('ü'): 'ue',
458 ord('ß'): 'ss',
459}
462def to_uid(x) -> str:
463 """Convert a value to an uid (alphanumeric string)."""
465 if not x:
466 return ''
467 x = to_str(x).lower().strip().translate(_UID_DE_TRANS)
468 x = re.sub(r'[^a-z0-9]+', '_', x)
469 return x.strip('_')
472def to_lines(txt: str, comment: str = None) -> list[str]:
473 """Convert a multiline string into a list of strings.
475 Strip each line, skip empty lines, if `comment` is given, also remove lines starting with it.
476 """
478 ls = []
480 for s in txt.splitlines():
481 if comment and comment in s:
482 s = s.split(comment)[0]
483 s = s.strip()
484 if s:
485 ls.append(s)
487 return ls
490##
493def parse_acl(acl):
494 """Parse an ACL config into an ACL.
496 Args:
497 acl: an ACL config. Can be given as a string ``allow X, allow Y, deny Z``,
498 or as a list of dicts ``{ role X type allow }, { role Y type deny }``,
499 or it can already be an ACL ``[1 X], [0 Y]``,
500 or it can be None.
502 Returns:
503 Access list.
504 """
506 if not acl:
507 return []
509 a = 'allow'
510 d = 'deny'
511 bits = {const.ALLOW, const.DENY}
512 err = 'invalid ACL'
514 access = []
516 if isinstance(acl, str):
517 for p in acl.strip().split(','):
518 s = p.strip().split()
519 if len(s) != 2:
520 raise ValueError(err)
521 if s[0] == a:
522 access.append((const.ALLOW, s[1]))
523 elif s[0] == d:
524 access.append((const.DENY, s[1]))
525 else:
526 raise ValueError(err)
527 return access
529 if not isinstance(acl, list):
530 raise ValueError(err)
532 if isinstance(acl[0], (list, tuple)):
533 try:
534 if all(len(s) == 2 and s[0] in bits for s in acl):
535 return acl
536 except (TypeError, IndexError):
537 pass
538 raise ValueError(err)
540 if isinstance(acl[0], dict):
541 for s in acl:
542 tk = s.get('type', '')
543 rk = s.get('role', '')
544 if not isinstance(rk, str):
545 raise ValueError(err)
546 if tk == a:
547 access.append((const.ALLOW, rk))
548 elif tk == d:
549 access.append((const.DENY, rk))
550 else:
551 raise ValueError(err)
552 return access
554 raise ValueError(err)
557##
559UID_DELIMITER = '::'
562def join_uid(parent_uid, object_uid):
563 p = parent_uid.split(UID_DELIMITER)
564 u = object_uid.split(UID_DELIMITER)
565 return p[-1] + UID_DELIMITER + u[-1]
568def split_uid(joined_uid: str) -> tuple[str, str]:
569 p, _, u = joined_uid.partition(UID_DELIMITER)
570 return p, u
573##
576def is_file(path):
577 return os.path.isfile(path)
580def is_dir(path):
581 return os.path.isdir(path)
584def read_file(path: str) -> str:
585 try:
586 with open(path, 'rt', encoding='utf8') as fp:
587 return fp.read()
588 except Exception as exc:
589 log.debug(f'error reading {path=} {exc=}')
590 raise
593def read_file_b(path: str) -> bytes:
594 try:
595 with open(path, 'rb') as fp:
596 return fp.read()
597 except Exception as exc:
598 log.debug(f'error reading {path=} {exc=}')
599 raise
602def write_file(path: str, s: str, user: int = None, group: int = None):
603 try:
604 with open(path, 'wt', encoding='utf8') as fp:
605 fp.write(s)
606 chown_default(path, user, group)
607 return path
608 except Exception as exc:
609 log.debug(f'error writing {path=} {exc=}')
610 raise
613def write_file_b(path: str, s: bytes, user: int = None, group: int = None):
614 try:
615 with open(path, 'wb') as fp:
616 fp.write(s)
617 chown_default(path, user, group)
618 return path
619 except Exception as exc:
620 log.debug(f'error writing {path=} {exc=}')
621 raise
624def write_debug_file(path: str, s: str | bytes):
625 """Write a debug file with the given content."""
627 if isinstance(s, str):
628 s = s.encode('utf8')
629 try:
630 d = ensure_dir(f'{const.VAR_DIR}/debug')
631 with open(f'{d}/{path}', 'wb') as fp:
632 fp.write(s)
633 except Exception as exc:
634 log.debug(f'error writing debug {path=} {exc=}')
637def dirname(path):
638 return os.path.dirname(path)
641def ensure_dir(dir_path: str, base_dir: str = None, mode: int = 0o755, user: int = None, group: int = None) -> str:
642 """Check if a (possibly nested) directory exists and create if it does not.
644 Args:
645 dir_path: Path to a directory.
646 base_dir: Base directory.
647 mode: Directory creation mode.
648 user: Directory user (defaults to gws.c.UID)
649 group: Directory group (defaults to gws.c.GID)
651 Returns:
652 The absolute path to the directory.
653 """
655 if base_dir:
656 if os.path.isabs(dir_path):
657 raise ValueError(f'cannot use an absolute path {dir_path!r} with a base dir')
658 bpath = cast(bytes, os.path.join(base_dir.encode('utf8'), dir_path.encode('utf8')))
659 else:
660 if not os.path.isabs(dir_path):
661 raise ValueError(f'cannot use a relative path {dir_path!r} without a base dir')
662 bpath = dir_path.encode('utf8')
664 parts = []
666 for p in bpath.split(b'/'):
667 parts.append(p)
668 path = b'/'.join(parts)
669 if path and not os.path.isdir(path):
670 os.mkdir(path, mode)
672 chown_default(bpath, user, group)
673 return bpath.decode('utf8')
676def ensure_system_dirs():
677 for d in const.ALL_DIRS:
678 ensure_dir(d)
681def chown_default(path, user=None, group=None):
682 try:
683 os.chown(path, user or const.UID, group or const.GID)
684 except OSError:
685 pass
688_ephemeral_state = dict(
689 last_check_time=0,
690 check_interval=20 * 30,
691 max_age=20 * 30,
692)
695def ephemeral_path(name: str) -> str:
696 """Return a new ephemeral path name."""
698 # if stime() > _ephemeral_state['last_check_time'] + _ephemeral_state['check_interval']:
699 # _ephemeral_cleanup()
700 #
701 name = str(os.getpid()) + '_' + random_string(64) + '_' + name
702 return const.EPHEMERAL_DIR + '/' + name
705def ephemeral_dir(name: str) -> str:
706 """Create and return an ephemeral directory."""
708 return ensure_dir(const.EPHEMERAL_DIR + '/' + name)
711def ephemeral_cleanup():
712 """Remove ephemeral paths older than max age."""
714 cnt = 0
715 ts = stime()
717 for de in os.scandir(const.EPHEMERAL_DIR):
718 age = int(ts - de.stat().st_mtime)
719 if age > _ephemeral_state['max_age']:
720 try:
721 if de.is_dir():
722 shutil.rmtree(de.path)
723 else:
724 os.unlink(de.path)
725 cnt += 1
726 except OSError:
727 pass
729 _ephemeral_state['last_check_time'] = ts
731 if cnt > 0:
732 log.debug(f'_ephemeral_cleanup: {cnt}')
735def random_string(size: int) -> str:
736 """Generate a random string of length `size`."""
738 a = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
739 r = random.SystemRandom()
740 return ''.join(r.choice(a) for _ in range(size))
743class _FormatMapDefault:
744 def __init__(self, d, default):
745 self.d = d
746 self.default = default
748 def __getitem__(self, item):
749 val = self.d.get(item)
750 return val if val is not None else self.default
753def format_map(fmt: str, x: Union[dict, 'Data'], default: str = '') -> str:
754 return fmt.format_map(_FormatMapDefault(x, default))
757def sha256(x):
758 def _bytes(x):
759 if is_bytes(x):
760 return bytes(x)
761 if isinstance(x, (int, float, bool)):
762 return str(x).encode('utf8')
763 if isinstance(x, str):
764 return x.encode('utf8')
766 def _default(x):
767 if is_data_object(x):
768 return vars(x)
769 return str(x)
771 c = _bytes(x)
772 if c is None:
773 j = json.dumps(x, default=_default, sort_keys=True, ensure_ascii=True)
774 c = j.encode('utf8')
776 return hashlib.sha256(c).hexdigest()
779class cached_property:
780 """Decorator for a cached property."""
782 def __init__(self, fn):
783 self._fn = fn
784 self.__doc__ = getattr(fn, '__doc__')
786 def __get__(self, obj, objtype=None):
787 value = self._fn(obj)
788 setattr(obj, self._fn.__name__, value)
789 return value
792# application lock/globals are global to one application
793# server locks lock the whole server
794# server globals are pickled in /tmp
797_app_lock = threading.RLock()
800def app_lock(name=''):
801 return _app_lock
804_app_globals: dict = {}
807def get_app_global(name, init_fn):
808 if name in _app_globals:
809 return _app_globals[name]
811 with app_lock(name):
812 if name not in _app_globals:
813 _app_globals[name] = init_fn()
815 return _app_globals[name]
818def set_app_global(name, value):
819 with app_lock(name):
820 _app_globals[name] = value
821 return _app_globals[name]
824def delete_app_global(name):
825 with app_lock(name):
826 _app_globals.pop(name, None)
829##
832def serialize_to_path(obj, path):
833 tmp = path + random_string(64)
834 with open(tmp, 'wb') as fp:
835 pickle.dump(obj, fp)
836 os.replace(tmp, path)
837 chown_default(path)
838 return path
841def unserialize_from_path(path):
842 with open(path, 'rb') as fp:
843 return pickle.load(fp)
846_server_globals = {}
849def get_cached_object(name: str, life_time: int, init_fn):
850 uid = to_uid(name)
851 path = const.OBJECT_CACHE_DIR + '/' + uid
853 def _get():
854 if not os.path.isfile(path):
855 return
856 try:
857 age = int(time.time() - os.stat(path).st_mtime)
858 except OSError:
859 return
860 if age < life_time:
861 try:
862 obj = unserialize_from_path(path)
863 log.debug(f'get_cached_object {uid!r} {life_time=} {age=} - loaded')
864 return obj
865 except:
866 log.exception(f'get_cached_object {uid!r} LOAD ERROR')
868 obj = _get()
869 if obj:
870 return obj
872 with server_lock(uid):
873 obj = _get()
874 if obj:
875 return obj
877 obj = init_fn()
878 try:
879 serialize_to_path(obj, path)
880 log.debug(f'get_cached_object {uid!r} - stored')
881 except:
882 log.exception(f'get_cached_object {uid!r} STORE ERROR')
884 return obj
887def get_server_global(name: str, init_fn):
888 uid = to_uid(name)
889 path = const.GLOBALS_DIR + '/' + uid
891 def _get():
892 if uid in _server_globals:
893 log.debug(f'get_server_global {uid!r} - found')
894 return True
896 if os.path.isfile(path):
897 try:
898 _server_globals[uid] = unserialize_from_path(path)
899 log.debug(f'get_server_global {uid!r} - loaded')
900 return True
901 except:
902 log.exception(f'get_server_global {uid!r} LOAD ERROR')
904 if _get():
905 return _server_globals[uid]
907 with server_lock(uid):
908 if _get():
909 return _server_globals[uid]
911 _server_globals[uid] = init_fn()
913 try:
914 serialize_to_path(_server_globals[uid], path)
915 log.debug(f'get_server_global {uid!r} - stored')
916 except:
917 log.exception(f'get_server_global {uid!r} STORE ERROR')
919 return _server_globals[uid]
922class _FileLock:
923 _PAUSE = 2
924 _TIMEOUT = 60
926 def __init__(self, uid):
927 self.uid = to_uid(uid)
928 self.path = const.LOCKS_DIR + '/' + self.uid
930 def __enter__(self):
931 self.acquire()
932 log.debug(f'server lock {self.uid!r} ACQUIRED')
934 def __exit__(self, exc_type, exc_val, exc_tb):
935 self.release()
937 def acquire(self):
938 ts = time.time()
940 while True:
941 try:
942 fp = os.open(self.path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
943 os.write(fp, bytes(os.getpid()))
944 os.close(fp)
945 return
946 except FileExistsError:
947 pass
949 t = time.time() - ts
951 if t > self._TIMEOUT:
952 raise ValueError('lock timeout', self.uid)
954 log.debug(f'server lock {self.uid!r} WAITING time={t:.3f}')
955 time.sleep(self._PAUSE)
957 def release(self):
958 try:
959 os.unlink(self.path)
960 log.debug(f'server lock {self.uid!r} RELEASED')
961 except:
962 log.exception(f'server lock {self.uid!r} RELEASE ERROR')
965def server_lock(uid):
966 return _FileLock(uid)
969##
972def action_url_path(name: str, **kwargs) -> str:
973 ls = []
975 for k, v in kwargs.items():
976 if not is_empty(v):
977 ls.append(urllib.parse.quote(k))
978 ls.append(urllib.parse.quote(to_str(v)))
980 path = const.SERVER_ENDPOINT + '/' + name
981 if ls:
982 path += '/' + '/'.join(ls)
983 return path
986##
989def utime() -> float:
990 """Unix time as a float number."""
991 return time.time()
994def stime() -> int:
995 """Unix time as an integer number of seconds."""
996 return int(time.time())
999def sleep(n: float):
1000 """Sleep for n seconds."""
1001 time.sleep(n)
1004def mstime() -> int:
1005 """Unix time as an integer number of milliseconds."""
1006 return int(time.time() * 1000)
1009def microtime() -> int:
1010 """Unix time as an integer number of microseconds."""
1011 return int(time.time() * 1000000)