Coverage for gws-app/gws/lib/watcher/__init__.py: 100%
67 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-16 23:09 +0200
1"""File system watcher.
3Monitor file system events such as file creation, deletion, modification, and movement.
4The module allows directories or specific files to be monitored, supports pattern matching for files,
5and provides a mechanism to trigger custom notification callbacks when events occur.
6"""
8from typing import Callable, TypeAlias
9import os
10import re
12import watchdog.events
13import watchdog.observers
15import gws
17_WATCH_EVENTS = {
18 watchdog.events.EVENT_TYPE_MOVED,
19 watchdog.events.EVENT_TYPE_DELETED,
20 watchdog.events.EVENT_TYPE_CREATED,
21 watchdog.events.EVENT_TYPE_MODIFIED,
22}
24_EVENTS = []
27class _DirEntry:
28 def __init__(self, dirname, pattern, recursive):
29 self.dirname = dirname
30 self.pattern = pattern
31 self.recursive = recursive
34_NotifyFn: TypeAlias = Callable[[str, str], None]
37def new(notify: _NotifyFn):
38 """Create a new watcher.
40 Args:
41 notify: A callback function that will be called when an event occurs.
42 It should accept two arguments: the event type and the path of the file.
43 Note that the callback will be called from a different thread than the one that created the watcher.
44 """
45 return Watcher(notify)
48class Watcher:
49 observer: watchdog.observers.Observer
51 def __init__(self, notify: _NotifyFn):
52 self.notify = notify
53 self.dirEntries = {}
54 self.filePaths = set()
55 self.excludePatterns = []
57 def add_directory(self, dirname: str | os.PathLike, file_pattern: str = '', recursive: bool = False):
58 d = str(dirname)
59 self.dirEntries[d] = _DirEntry(d, file_pattern or '.', recursive)
61 def add_file(self, filename: str | os.PathLike):
62 self.filePaths.add(str(filename))
64 def exclude(self, path_pattern: str):
65 self.excludePatterns.append(path_pattern)
67 def start(self):
68 self.observer = watchdog.observers.Observer()
70 h = _Handler(self)
72 for de in self.dirEntries.values():
73 gws.log.debug(f'watcher: watching {de.dirname!r}')
74 self.observer.schedule(h, de.dirname, recursive=de.recursive)
76 for f in self.filePaths:
77 gws.log.debug(f'watcher: watching {f!r}')
78 self.observer.schedule(h, os.path.dirname(f), recursive=False)
80 self.observer.start()
81 gws.log.debug(f'watcher: started with {self.observer.__class__.__name__}')
83 def stop(self):
84 if self.observer is not None:
85 self.observer.stop()
86 self.observer.join()
87 gws.log.debug(f'watcher: stopped')
89 def register(self, ev: watchdog.events.FileSystemEvent):
90 if self.path_matches(ev.src_path):
91 gws.log.debug(f'watcher: {ev.event_type} {ev.src_path}')
92 self.notify(ev.event_type, ev.src_path)
94 def path_matches(self, path):
95 if any(re.search(ex, path) for ex in self.excludePatterns):
96 return False
97 if path in self.filePaths:
98 return True
99 d, f = os.path.split(path)
100 for de in self.dirEntries.values():
101 if (d == de.dirname or (de.recursive and d.startswith(de.dirname + '/'))) and re.search(de.pattern, f):
102 return True
103 return False
106class _Handler(watchdog.events.FileSystemEventHandler):
107 def __init__(self, obj: Watcher):
108 super().__init__()
109 self.obj = obj
111 def on_any_event(self, ev):
112 if ev.event_type in _WATCH_EVENTS:
113 self.obj.register(ev)